001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2019–2020 microBean™. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.jersey.netty; 018 019import java.io.InputStream; 020import java.io.IOException; 021 022import java.util.Objects; 023 024import java.util.concurrent.Phaser; 025 026import java.util.function.Function; 027 028import io.netty.buffer.ByteBuf; 029import io.netty.buffer.ByteBufAllocator; 030import io.netty.buffer.CompositeByteBuf; 031 032/** 033 * An {@link InputStream} implementation that {@linkplain 034 * #read(byte[], int, int) reads from} a {@link CompositeByteBuf} 035 * whose contents are updated via the {@link #addByteBuf(ByteBuf)} 036 * method. 037 * 038 * <p>This class is designed to bridge the gap between a Netty 039 * I/O-focused event loop and a thread where Jersey will be reading 040 * content. Reads must block on the Jersey thread, but must not 041 * impact the Netty event loop. Instances of this class are used by 042 * {@link AbstractContainerRequestDecoder} implementations, when they 043 * need to supply an {@link InputStream} to Jersey so that Jersey can 044 * read any incoming entity payloads.</p> 045 * 046 * <p>The {@link AbstractContainerRequestDecoder} implementation will 047 * typically call {@link #addByteBuf(ByteBuf)} on the Netty event 048 * loop, and then will call {@link #terminate()} when it is done. 049 * Meanwhile, Jersey may call the {@link #read(byte[], int, int)} 050 * method on its own thread, and such a call may block at any point 051 * until Netty supplies more content.</p> 052 * 053 * @author <a href="https://about.me/lairdnelson" 054 * target="_parent">Laird Nelson</a> 055 * 056 * @see #addByteBuf(ByteBuf) 057 * 058 * @see CompositeByteBuf 059 * 060 * @see 061 * AbstractContainerRequestDecoder#createTerminableByteBufInputStream(ByteBufAllocator) 062 */ 063public final class TerminableByteBufInputStream extends InputStream { 064 065 private static final int OPEN = 0; 066 067 private static final int TERMINATED = 1; 068 069 private static final int CLOSED = 2; 070 071 private volatile int state; 072 073 private final CompositeByteBuf byteBuf; 074 075 private final Phaser phaser; 076 077 /** 078 * Creates a new {@link TerminableByteBufInputStream}. 079 * 080 * @param byteBufAllocator a {@link ByteBufAllocator} that will 081 * {@linkplain ByteBufAllocator#compositeBuffer() allocate a new 082 * <code>CompositeByteBuf</code>} from which {@linkplain 083 * #read(byte[], int, int) content may be read by another thread}; 084 * must not be {@code null} 085 * 086 * @exception NullPointerException if {@code byteBufAllocator} is 087 * {@code null} 088 * 089 * @see #addByteBuf(ByteBuf) 090 * 091 * @see #read(byte[], int, int) 092 */ 093 public TerminableByteBufInputStream(final ByteBufAllocator byteBufAllocator) { 094 super(); 095 this.byteBuf = Objects.requireNonNull(byteBufAllocator).compositeBuffer(); 096 this.phaser = new Phaser(2); 097 } 098 099 /** 100 * Closes this {@link TerminableByteBufInputStream} and {@linkplain 101 * ByteBuf#release() releases its underlying 102 * <code>CompositeByteBuf</code>}. 103 * 104 * <p>If this method has been called previously, no further action 105 * is taken.</p> 106 * 107 * @exception IOException if an error occurs 108 */ 109 @Override 110 public final void close() throws IOException { 111 final int state = this.state; 112 switch (state) { 113 case CLOSED: 114 break; 115 case TERMINATED: 116 // fall through 117 case OPEN: 118 try { 119 assert allComponentsHaveRefCount(1); 120 // No need to synchronize; release() works on a volatile int 121 final boolean released = this.byteBuf.release(); 122 assert released; 123 } finally { 124 this.state = CLOSED; 125 this.phaser.forceTermination(); 126 } 127 break; 128 default: 129 throw new IOException("Unexpected state: " + state); 130 } 131 } 132 133 /** 134 * Irrevocably disables the {@link #addByteBuf(ByteBuf)} method such 135 * that calling it will result in an {@link IllegalStateException} 136 * being thrown. 137 * 138 * <p>If this method has been called before, or if {@link #close()} 139 * has been called before, no action is taken.</p> 140 * 141 * @see #addByteBuf(ByteBuf) 142 * 143 * @see #close() 144 */ 145 public final void terminate() { 146 final int state = this.state; 147 switch (state) { 148 case CLOSED: 149 break; 150 case TERMINATED: 151 break; 152 case OPEN: 153 this.state = TERMINATED; 154 this.phaser.forceTermination(); 155 break; 156 default: 157 throw new IllegalStateException("Unexpected state: " + state); 158 } 159 } 160 161 /** 162 * Returns an estimate of the number of bytes that may be read 163 * without blocking. 164 * 165 * @return an estimate of the number of bytes that may be read 166 * without blocking; always {@code 0} or a positive {@code int} 167 * 168 * @exception IOException if this {@link 169 * TerminableByteBufInputStream} has been {@linkplain #close() 170 * closed} 171 * 172 * @see #close() 173 */ 174 @Override 175 public final int available() throws IOException { 176 final int state = this.state; 177 switch (state) { 178 case CLOSED: 179 throw new IOException("closed"); 180 case TERMINATED: 181 // No further writes will happen so no synchronization needed. 182 return this.byteBuf.readableBytes(); 183 case OPEN: 184 synchronized (this.byteBuf) { 185 return this.byteBuf.readableBytes(); 186 } 187 default: 188 throw new IOException("Unexpected state: " + state); 189 } 190 } 191 192 /** 193 * Reads a single byte from this {@link TerminableByteBufInputStream}. 194 * 195 * @return the byte read, or {@code -1} if the end of the stream has 196 * been reached (for this to happen {@link #terminate()} must have 197 * already been called at some point in the past) 198 * 199 * @exception IOException if this {@link 200 * TerminableByteBufInputStream} has been {@linkplain #close() 201 * closed} 202 * 203 * @see #terminate() 204 * 205 * @see #close() 206 */ 207 @Override 208 public final int read() throws IOException { 209 final int state = this.state; 210 switch (state) { 211 case CLOSED: 212 throw new IOException("closed"); 213 case TERMINATED: 214 // fall through 215 case OPEN: 216 return this.read(sourceByteBuf -> Integer.valueOf(sourceByteBuf.readByte())); 217 default: 218 throw new IOException("Unexpected state: " + state); 219 } 220 } 221 222 /** 223 * Calls the {@link #read(byte[], int, int)} method with the 224 * supplied {@code targetBytes} array, {@code 0} and {@code 225 * targetBytes.length} as arguments and returns its result. 226 * 227 * @param targetBytes the {@code byte} array into which read {@code 228 * byte}s will be written, beginning at index {@code 0}; must not be 229 * {@code null} 230 * 231 * @return the number of {@code byte}s actually read, or {@code -1} 232 * if the end of the stream has been reached, in which case {@link 233 * #terminate()} must have been called in the past 234 * 235 * @exception NullPointerException if {@code targetBytes} is {@code 236 * null} 237 * 238 * @exception IOException if this {@link 239 * TerminableByteBufInputStream} has been {@link #close() closed} 240 * 241 * @see #read(byte[], int, int) 242 * 243 * @see #terminate() 244 * 245 * @see #close() 246 * 247 * @see #addByteBuf(ByteBuf) 248 */ 249 @Override 250 public final int read(final byte[] targetBytes) throws IOException { 251 return this.read(targetBytes, 0, targetBytes.length); 252 } 253 254 /** 255 * Attempts to read the desired number of {@code byte}s as indicated 256 * by the supplied {@code length} parameter into the supplied {@code 257 * targetByte} array, beginning the write at the element in the 258 * supplied {@code targetByteArray} designated by the {@code offset} 259 * parameter, and returns the actual number of {@code byte}s read, 260 * or {@code -1} if no {@code byte}s were read and the end of the 261 * stream was reached, in which case {@link #terminate()} must have 262 * been called in the past. 263 * 264 * <p>Content to read is supplied by means of the {@link 265 * #addByteBuf(ByteBuf)} method.</p> 266 * 267 * @param targetByteArray an array of {@code byte}s to which read 268 * {@code byte}s will be written, beginning at the element 269 * identified by the supplied {@code offset}; must not be {@code null} 270 * 271 * @param offset the zero-based index of the element within the 272 * supplied {@code targetByteArray} that will hold the first {@code 273 * byte} read; must be {@code 0} or greater and less than the length 274 * property of the supplied {@code targetByteArray} 275 * 276 * @param length the number of {@code byte}s to read; must be {@code 277 * zero} or greater and must be less than or equal to the length 278 * property of the supplied {@code targetByteArray} minus the 279 * supplied (valid) {@code offset} 280 * 281 * @return the number of {@code byte}s actually read, or {@code -1} 282 * if the end of the stream has been reached, in which case {@link 283 * #terminate()} must have been called in the past 284 * 285 * @exception NullPointerException if {@code targetByteArray} is 286 * {@code null} 287 * 288 * @exception IndexOutOfBoundsException if {@code offset} is less 289 * than {@code 0}, or if {@code length} is less than {@code 0}, or 290 * if {@code length} is greater than the length property of the 291 * supplied {@code targetByteArray} parameter minus the supplied 292 * {@code offset} 293 * 294 * @exception IOException if this {@link 295 * TerminableByteBufInputStream} has been {@link #close() closed} 296 * 297 * @see #terminate() 298 * 299 * @see #close() 300 * 301 * @see #addByteBuf(ByteBuf) 302 */ 303 @Override 304 public final int read(final byte[] targetByteArray, final int offset, final int length) throws IOException { 305 if (offset < 0 || length < 0 || length > targetByteArray.length - offset) { 306 throw new IndexOutOfBoundsException(); 307 } 308 final int state = this.state; 309 switch (state) { 310 case CLOSED: 311 throw new IOException("closed"); 312 case TERMINATED: 313 // fall through 314 case OPEN: 315 // Synchronization will be handled by #read(Function) 316 return length == 0 ? 0 : this.read(sourceByteBuf -> { 317 final int readThisManyBytes = Math.min(length, sourceByteBuf.readableBytes()); 318 sourceByteBuf.readBytes(targetByteArray, offset, readThisManyBytes); 319 return Integer.valueOf(readThisManyBytes); 320 }); 321 default: 322 throw new IOException("Unexpected state: " + state); 323 } 324 } 325 326 /** 327 * Adds content for this {@link TerminableByteBufInputStream} to 328 * {@linkplain #read(byte[], int, int) read}. 329 * 330 * @param byteBuf a {@link ByteBuf}; must not be {@code null} and 331 * must be (initially) {@link ByteBuf#isReadable() readable} 332 * 333 * @exception NullPointerException if {@code byteBuf} is {@code 334 * null} 335 * 336 * @exception IllegalArgumentException if {@code byteBuf} is 337 * {@linkplain ByteBuf#isReadable() not readable} 338 * 339 * @exception IllegalStateException if this {@link 340 * TerminableByteBufInputStream} is {@link #close() closed} or 341 * {@linkplain #terminate() terminated} 342 * 343 * @see #terminate() 344 * 345 * @see #close() 346 * 347 * @see #read(byte[], int, int) 348 */ 349 public final void addByteBuf(final ByteBuf byteBuf) { 350 Objects.requireNonNull(byteBuf); 351 if (!byteBuf.isReadable()) { 352 // Prevent adds of empty ByteBufs as much as we can. 353 throw new IllegalArgumentException("!byteBuf.isReadable()"); 354 } 355 final int state = this.state; 356 switch (state) { 357 case CLOSED: 358 throw new IllegalStateException("closed"); 359 case TERMINATED: 360 throw new IllegalStateException("terminated"); 361 case OPEN: 362 synchronized (this.byteBuf) { 363 this.byteBuf.addComponent(true /* advance the writerIndex */, byteBuf); 364 } 365 this.phaser.arrive(); // (Nonblocking) 366 break; 367 default: 368 throw new IllegalStateException("Unexected state: " + state); 369 } 370 } 371 372 private final int read(final Function<? super ByteBuf, ? extends Integer> function) throws IOException { 373 Objects.requireNonNull(function); 374 int state = this.state; 375 switch (state) { 376 case CLOSED: 377 throw new IOException("closed"); 378 case TERMINATED: 379 // No further writes will happen so no synchronization needed. 380 return this.byteBuf.isReadable() ? function.apply(this.byteBuf) : -1; 381 case OPEN: 382 do { 383 synchronized (this.byteBuf) { 384 if (this.byteBuf.isReadable()) { 385 break; 386 } 387 } 388 this.phaser.awaitAdvance(this.phaser.arrive()); // BLOCKING 389 } while ((state = this.state) == OPEN); 390 // We unblocked (or maybe never blocked in the first place). 391 // This is either because our state changed to something that is 392 // not OPEN or our CompositeByteBuf became readable. Check state 393 // first. 394 switch (state) { 395 case CLOSED: 396 throw new IOException("closed"); 397 case TERMINATED: 398 // No further writes will happen so no synchronization needed. 399 return this.byteBuf.isReadable() ? function.apply(this.byteBuf) : -1; 400 case OPEN: 401 synchronized (this.byteBuf) { 402 return function.apply(this.byteBuf); 403 } 404 default: 405 throw new IOException("Unexpected state: " + state); 406 } 407 default: 408 throw new IOException("Unexpected state: " + state); 409 } 410 } 411 412 private final boolean allComponentsHaveRefCount(final int refCnt) { 413 assert this.byteBuf.refCnt() == refCnt; 414 if (refCnt > 0) { 415 // This check only works when refCnt is greater than 0 because 416 // if it is 0 then CompositeByteBuf#internalComponent(int) will 417 // throw an IllegalReferenceCountException. This means 418 // effectively there's no way to assert that the innards of 419 // CompositeByteBuf are freed; we just have to take it on faith 420 // that if the CompositeByteBuf is freed then so are its 421 // components. 422 final int numComponents = this.byteBuf.numComponents(); 423 for (int i = 0; i < numComponents; i++) { 424 assert this.byteBuf.internalComponent(i).refCnt() == refCnt; 425 } 426 } 427 return true; 428 } 429 430}