001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2019–2020 microBean™.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.jersey.netty;
018
019import java.io.InputStream;
020import java.io.IOException;
021
022import java.util.Objects;
023
024import java.util.concurrent.Phaser;
025
026import java.util.function.Function;
027
028import io.netty.buffer.ByteBuf;
029import io.netty.buffer.ByteBufAllocator;
030import io.netty.buffer.CompositeByteBuf;
031
032/**
033 * An {@link InputStream} implementation that {@linkplain
034 * #read(byte[], int, int) reads from} a {@link CompositeByteBuf}
035 * whose contents are updated via the {@link #addByteBuf(ByteBuf)}
036 * method.
037 *
038 * <p>This class is designed to bridge the gap between a Netty
039 * I/O-focused event loop and a thread where Jersey will be reading
040 * content.  Reads must block on the Jersey thread, but must not
041 * impact the Netty event loop.  Instances of this class are used by
042 * {@link AbstractContainerRequestDecoder} implementations, when they
043 * need to supply an {@link InputStream} to Jersey so that Jersey can
044 * read any incoming entity payloads.</p>
045 *
046 * <p>The {@link AbstractContainerRequestDecoder} implementation will
047 * typically call {@link #addByteBuf(ByteBuf)} on the Netty event
048 * loop, and then will call {@link #terminate()} when it is done.
049 * Meanwhile, Jersey may call the {@link #read(byte[], int, int)}
050 * method on its own thread, and such a call may block at any point
051 * until Netty supplies more content.</p>
052 *
053 * @author <a href="https://about.me/lairdnelson"
054 * target="_parent">Laird Nelson</a>
055 *
056 * @see #addByteBuf(ByteBuf)
057 * 
058 * @see CompositeByteBuf
059 *
060 * @see
061 * AbstractContainerRequestDecoder#createTerminableByteBufInputStream(ByteBufAllocator)
062 */
063public final class TerminableByteBufInputStream extends InputStream {
064
065  private static final int OPEN = 0;
066
067  private static final int TERMINATED = 1;
068
069  private static final int CLOSED = 2;
070
071  private volatile int state;
072  
073  private final CompositeByteBuf byteBuf;
074
075  private final Phaser phaser;
076
077  /**
078   * Creates a new {@link TerminableByteBufInputStream}.
079   *
080   * @param byteBufAllocator a {@link ByteBufAllocator} that will
081   * {@linkplain ByteBufAllocator#compositeBuffer() allocate a new
082   * <code>CompositeByteBuf</code>} from which {@linkplain
083   * #read(byte[], int, int) content may be read by another thread};
084   * must not be {@code null}
085   *
086   * @exception NullPointerException if {@code byteBufAllocator} is
087   * {@code null}
088   *
089   * @see #addByteBuf(ByteBuf)
090   *
091   * @see #read(byte[], int, int)
092   */
093  public TerminableByteBufInputStream(final ByteBufAllocator byteBufAllocator) {
094    super();
095    this.byteBuf = Objects.requireNonNull(byteBufAllocator).compositeBuffer();
096    this.phaser = new Phaser(2);
097  }
098
099  /**
100   * Closes this {@link TerminableByteBufInputStream} and {@linkplain
101   * ByteBuf#release() releases its underlying
102   * <code>CompositeByteBuf</code>}.
103   *
104   * <p>If this method has been called previously, no further action
105   * is taken.</p>
106   *
107   * @exception IOException if an error occurs
108   */
109  @Override
110  public final void close() throws IOException {
111    final int state = this.state;
112    switch (state) {
113    case CLOSED:
114      break;
115    case TERMINATED:
116      // fall through
117    case OPEN:
118      try {
119        assert allComponentsHaveRefCount(1);
120        // No need to synchronize; release() works on a volatile int
121        final boolean released = this.byteBuf.release();
122        assert released;
123      } finally {
124        this.state = CLOSED;
125        this.phaser.forceTermination();
126      }
127      break;
128    default:
129      throw new IOException("Unexpected state: " + state);
130    }
131  }
132
133  /**
134   * Irrevocably disables the {@link #addByteBuf(ByteBuf)} method such
135   * that calling it will result in an {@link IllegalStateException}
136   * being thrown.
137   *
138   * <p>If this method has been called before, or if {@link #close()}
139   * has been called before, no action is taken.</p>
140   *
141   * @see #addByteBuf(ByteBuf)
142   *
143   * @see #close()
144   */
145  public final void terminate() {
146    final int state = this.state;
147    switch (state) {
148    case CLOSED:
149      break;
150    case TERMINATED:
151      break;
152    case OPEN:
153      this.state = TERMINATED;
154      this.phaser.forceTermination();
155      break;
156    default:
157      throw new IllegalStateException("Unexpected state: " + state);
158    }
159  }
160
161  /**
162   * Returns an estimate of the number of bytes that may be read
163   * without blocking.
164   *
165   * @return an estimate of the number of bytes that may be read
166   * without blocking; always {@code 0} or a positive {@code int}
167   *
168   * @exception IOException if this {@link
169   * TerminableByteBufInputStream} has been {@linkplain #close()
170   * closed}
171   *
172   * @see #close()
173   */
174  @Override
175  public final int available() throws IOException {
176    final int state = this.state;
177    switch (state) {
178    case CLOSED:
179      throw new IOException("closed");
180    case TERMINATED:
181      // No further writes will happen so no synchronization needed.
182      return this.byteBuf.readableBytes();
183    case OPEN:
184      synchronized (this.byteBuf) {
185        return this.byteBuf.readableBytes();
186      }
187    default:
188      throw new IOException("Unexpected state: " + state);
189    }
190  }
191
192  /**
193   * Reads a single byte from this {@link TerminableByteBufInputStream}.
194   *
195   * @return the byte read, or {@code -1} if the end of the stream has
196   * been reached (for this to happen {@link #terminate()} must have
197   * already been called at some point in the past)
198   *
199   * @exception IOException if this {@link
200   * TerminableByteBufInputStream} has been {@linkplain #close()
201   * closed}
202   *
203   * @see #terminate()
204   *
205   * @see #close()
206   */
207  @Override
208  public final int read() throws IOException {
209    final int state = this.state;
210    switch (state) {
211    case CLOSED:
212      throw new IOException("closed");
213    case TERMINATED:
214      // fall through
215    case OPEN:
216      return this.read(sourceByteBuf -> Integer.valueOf(sourceByteBuf.readByte()));
217    default:
218      throw new IOException("Unexpected state: " + state);
219    }
220  }
221
222  /**
223   * Calls the {@link #read(byte[], int, int)} method with the
224   * supplied {@code targetBytes} array, {@code 0} and {@code
225   * targetBytes.length} as arguments and returns its result.
226   *
227   * @param targetBytes the {@code byte} array into which read {@code
228   * byte}s will be written, beginning at index {@code 0}; must not be
229   * {@code null}
230   *
231   * @return the number of {@code byte}s actually read, or {@code -1}
232   * if the end of the stream has been reached, in which case {@link
233   * #terminate()} must have been called in the past
234   *
235   * @exception NullPointerException if {@code targetBytes} is {@code
236   * null}
237   *
238   * @exception IOException if this {@link
239   * TerminableByteBufInputStream} has been {@link #close() closed}
240   *
241   * @see #read(byte[], int, int)
242   *
243   * @see #terminate()
244   *
245   * @see #close()
246   *
247   * @see #addByteBuf(ByteBuf)
248   */
249  @Override
250  public final int read(final byte[] targetBytes) throws IOException {
251    return this.read(targetBytes, 0, targetBytes.length);
252  }
253
254  /**
255   * Attempts to read the desired number of {@code byte}s as indicated
256   * by the supplied {@code length} parameter into the supplied {@code
257   * targetByte} array, beginning the write at the element in the
258   * supplied {@code targetByteArray} designated by the {@code offset}
259   * parameter, and returns the actual number of {@code byte}s read,
260   * or {@code -1} if no {@code byte}s were read and the end of the
261   * stream was reached, in which case {@link #terminate()} must have
262   * been called in the past.
263   *
264   * <p>Content to read is supplied by means of the {@link
265   * #addByteBuf(ByteBuf)} method.</p>
266   *
267   * @param targetByteArray an array of {@code byte}s to which read
268   * {@code byte}s will be written, beginning at the element
269   * identified by the supplied {@code offset}; must not be {@code null}
270   *
271   * @param offset the zero-based index of the element within the
272   * supplied {@code targetByteArray} that will hold the first {@code
273   * byte} read; must be {@code 0} or greater and less than the length
274   * property of the supplied {@code targetByteArray}
275   *
276   * @param length the number of {@code byte}s to read; must be {@code
277   * zero} or greater and must be less than or equal to the length
278   * property of the supplied {@code targetByteArray} minus the
279   * supplied (valid) {@code offset}
280   *
281   * @return the number of {@code byte}s actually read, or {@code -1}
282   * if the end of the stream has been reached, in which case {@link
283   * #terminate()} must have been called in the past
284   *
285   * @exception NullPointerException if {@code targetByteArray} is
286   * {@code null}
287   *
288   * @exception IndexOutOfBoundsException if {@code offset} is less
289   * than {@code 0}, or if {@code length} is less than {@code 0}, or
290   * if {@code length} is greater than the length property of the
291   * supplied {@code targetByteArray} parameter minus the supplied
292   * {@code offset}
293   *
294   * @exception IOException if this {@link
295   * TerminableByteBufInputStream} has been {@link #close() closed}
296   *
297   * @see #terminate()
298   *
299   * @see #close()
300   *
301   * @see #addByteBuf(ByteBuf)
302   */
303  @Override
304  public final int read(final byte[] targetByteArray, final int offset, final int length) throws IOException {
305    if (offset < 0 || length < 0 || length > targetByteArray.length - offset) {
306      throw new IndexOutOfBoundsException();
307    }
308    final int state = this.state;
309    switch (state) {
310    case CLOSED:
311      throw new IOException("closed");
312    case TERMINATED:
313      // fall through
314    case OPEN:
315      // Synchronization will be handled by #read(Function)
316      return length == 0 ? 0 : this.read(sourceByteBuf -> {
317          final int readThisManyBytes = Math.min(length, sourceByteBuf.readableBytes());
318          sourceByteBuf.readBytes(targetByteArray, offset, readThisManyBytes);
319          return Integer.valueOf(readThisManyBytes);
320        });
321    default:
322      throw new IOException("Unexpected state: " + state);
323    }
324  }
325
326  /**
327   * Adds content for this {@link TerminableByteBufInputStream} to
328   * {@linkplain #read(byte[], int, int) read}.
329   *
330   * @param byteBuf a {@link ByteBuf}; must not be {@code null} and
331   * must be (initially) {@link ByteBuf#isReadable() readable}
332   *
333   * @exception NullPointerException if {@code byteBuf} is {@code
334   * null}
335   *
336   * @exception IllegalArgumentException if {@code byteBuf} is
337   * {@linkplain ByteBuf#isReadable() not readable}
338   *
339   * @exception IllegalStateException if this {@link
340   * TerminableByteBufInputStream} is {@link #close() closed} or
341   * {@linkplain #terminate() terminated}
342   *
343   * @see #terminate()
344   *
345   * @see #close()
346   *
347   * @see #read(byte[], int, int)
348   */
349  public final void addByteBuf(final ByteBuf byteBuf) {
350    Objects.requireNonNull(byteBuf);
351    if (!byteBuf.isReadable()) {
352      // Prevent adds of empty ByteBufs as much as we can.
353      throw new IllegalArgumentException("!byteBuf.isReadable()");
354    }
355    final int state = this.state;
356    switch (state) {
357    case CLOSED:
358      throw new IllegalStateException("closed");
359    case TERMINATED:
360      throw new IllegalStateException("terminated");
361    case OPEN:
362      synchronized (this.byteBuf) {
363        this.byteBuf.addComponent(true /* advance the writerIndex */, byteBuf);
364      }
365      this.phaser.arrive(); // (Nonblocking)
366      break;
367    default:
368      throw new IllegalStateException("Unexected state: " + state);
369    }
370  }
371
372  private final int read(final Function<? super ByteBuf, ? extends Integer> function) throws IOException {
373    Objects.requireNonNull(function);
374    int state = this.state;
375    switch (state) {
376    case CLOSED:
377      throw new IOException("closed");
378    case TERMINATED:
379      // No further writes will happen so no synchronization needed.
380      return this.byteBuf.isReadable() ? function.apply(this.byteBuf) : -1;
381    case OPEN:
382      do {
383        synchronized (this.byteBuf) {
384          if (this.byteBuf.isReadable()) {
385            break;
386          }
387        }
388        this.phaser.awaitAdvance(this.phaser.arrive()); // BLOCKING
389      } while ((state = this.state) == OPEN);
390      // We unblocked (or maybe never blocked in the first place).
391      // This is either because our state changed to something that is
392      // not OPEN or our CompositeByteBuf became readable. Check state
393      // first.
394      switch (state) {
395      case CLOSED:
396        throw new IOException("closed");
397      case TERMINATED:
398        // No further writes will happen so no synchronization needed.
399        return this.byteBuf.isReadable() ? function.apply(this.byteBuf) : -1;
400      case OPEN:
401        synchronized (this.byteBuf) {
402          return function.apply(this.byteBuf);
403        }
404      default:
405        throw new IOException("Unexpected state: " + state);
406      }
407    default:
408      throw new IOException("Unexpected state: " + state);
409    }
410  }
411
412  private final boolean allComponentsHaveRefCount(final int refCnt) {
413    assert this.byteBuf.refCnt() == refCnt;
414    if (refCnt > 0) {
415      // This check only works when refCnt is greater than 0 because
416      // if it is 0 then CompositeByteBuf#internalComponent(int) will
417      // throw an IllegalReferenceCountException.  This means
418      // effectively there's no way to assert that the innards of
419      // CompositeByteBuf are freed; we just have to take it on faith
420      // that if the CompositeByteBuf is freed then so are its
421      // components.
422      final int numComponents = this.byteBuf.numComponents();
423      for (int i = 0; i < numComponents; i++) {
424        assert this.byteBuf.internalComponent(i).refCnt() == refCnt;
425      }
426    }
427    return true;
428  }
429  
430}