001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2019–2020 microBean™.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.jersey.netty;
018
019import java.io.IOException;
020import java.io.OutputStream;
021
022import java.util.Objects;
023
024import io.netty.channel.Channel;
025import io.netty.channel.ChannelOutboundInvoker;
026import io.netty.channel.ChannelPromise;
027
028/**
029 * An {@link OutputStream} that delegates writing and flushing
030 * operations to a {@link ChannelOutboundInvoker}.
031 *
032 * <h2>Thread Safety</h2>
033 *
034 * <p>Instances of this class are safe for concurrent use by multiple
035 * threads.</p>
036 *
037 * @param <T> the type of message that will be written; see {@link
038 * #createMessage(byte[], int, int)}
039 *
040 * @author <a href="https://about.me/lairdnelson"
041 * target="_parent">Laird Nelson</a>
042 *
043 * @see ChannelOutboundInvoker
044 */
045public abstract class AbstractChannelOutboundInvokingOutputStream<T> extends OutputStream {
046
047
048  /*
049   * Instance fields.
050   */
051
052
053  /**
054   * The {@link ChannelOutboundInvoker} underlying this {@link
055   * AbstractChannelOutboundInvokingOutputStream} implementation to
056   * which most operations are adapted.
057   *
058   * @see ChannelOutboundInvoker
059   *
060   * @see
061   * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
062   * int, boolean)
063   */
064  protected final ChannelOutboundInvoker channelOutboundInvoker;
065
066  /**
067   * Indicates whether the {@link #channelOutboundInvoker
068   * ChannelOutboundInvoker} should also be {@linkplain
069   * ChannelOutboundInvoker#close(ChannelPromise) closed} when {@link
070   * #close() close()} is called.
071   *
072   * @see
073   * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
074   * int, boolean)
075   */
076  protected final boolean closeChannelOutboundInvoker;
077
078  private final int flushThreshold;
079
080  private volatile int bytesWritten;
081
082
083  /*
084   * Constructors;
085   */
086
087
088  /**
089   * Creates a new {@link AbstractChannelOutboundInvokingOutputStream}
090   * that does not automatically flush and that does not ever
091   * {@linkplain ChannelOutboundInvoker#close() close} the supplied
092   * {@link ChannelOutboundInvoker}.
093   *
094   * @param channelOutboundInvoker the {@link ChannelOutboundInvoker}
095   * to which operations are adapted; must not be {@code null}
096   *
097   * @exception NullPointerException if {@code channelOutboundInvoker}
098   * is {@code null}
099   *
100   * @see
101   * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
102   * int, boolean)
103   *
104   * @see ChannelOutboundInvoker
105   */
106  protected AbstractChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker) {
107    this(channelOutboundInvoker, Integer.MAX_VALUE, false);
108  }
109
110  /**
111   * Creates a new {@link AbstractChannelOutboundInvokingOutputStream}
112   * that does not automatically flush.
113   *
114   * @param channelOutboundInvoker the {@link ChannelOutboundInvoker}
115   * to which operations are adapted; must not be {@code null}
116   *
117   * @param closeChannelOutboundInvoker whether {@link
118   * ChannelOutboundInvoker#close(ChannelPromise)} will be called on
119   * the supplied {@link ChannelOutboundInvoker} when {@link #close()
120   * close()} is called
121   *
122   * @exception NullPointerException if {@code channelOutboundInvoker}
123   * is {@code null}
124   *
125   * @see
126   * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
127   * int, boolean)
128   *
129   * @see ChannelOutboundInvoker
130   *
131   * @see #close()
132   */
133  protected AbstractChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker,
134                                                        final boolean closeChannelOutboundInvoker) {
135    this(channelOutboundInvoker, Integer.MAX_VALUE, closeChannelOutboundInvoker);
136  }
137
138  /**
139   * Creates a new {@link
140   * AbstractChannelOutboundInvokingOutputStream}.
141   *
142   * @param channelOutboundInvoker the {@link ChannelOutboundInvoker}
143   * to which operations are adapted; must not be {@code null}
144   *
145   * @param flushThreshold the minimum number of bytes that this
146   * instance has to {@linkplain #write(byte[], int, int) write}
147   * before an automatic {@linkplain #flush() flush} will take place;
148   * if less than {@code 0} {@code 0} will be used instead; if {@code
149   * Integer#MAX_VALUE} then no automatic flushing will occur
150   *
151   * @param closeChannelOutboundInvoker whether {@link
152   * ChannelOutboundInvoker#close(ChannelPromise)} will be called on
153   * the supplied {@link ChannelOutboundInvoker} when {@link #close()
154   * close()} is called
155   *
156   * @exception NullPointerException if {@code channelOutboundInvoker}
157   * is {@code null}
158   *
159   * @see ChannelOutboundInvoker
160   *
161   * @see #getFlushThreshold()
162   *
163   * @see #close()
164   */
165  protected AbstractChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker,
166                                                        final int flushThreshold,
167                                                        final boolean closeChannelOutboundInvoker) {
168    super();
169    this.flushThreshold = Math.max(0, flushThreshold);
170    this.channelOutboundInvoker = Objects.requireNonNull(channelOutboundInvoker);
171    this.closeChannelOutboundInvoker = closeChannelOutboundInvoker;
172  }
173
174
175  /*
176   * Instance methods.
177   */
178
179
180  /**
181   * Returns the minimum number of bytes that this {@link
182   * AbstractChannelOutboundInvokingOutputStream} implementation has
183   * to {@linkplain #write(byte[], int, int) write} before an
184   * automatic {@linkplain #flush() flush} will take place.
185   *
186   * <p>This method will always return {@code 0} or a positive {@code
187   * int}.</p>
188   *
189   * <p>If this method returns {@code 0}, then a call to {@link
190   * #flush()} will be made at some point after every {@link
191   * #write(byte[], int, int)} invocation.</p>
192   *
193   * <p>If this method returns {@link Integer#MAX_VALUE}, then no
194   * automatic flushing will occur.</p>
195   *
196   * @return the minimum number of bytes that this {@link
197   * AbstractChannelOutboundInvokingOutputStream} implementation has
198   * to {@linkplain #write(byte[], int, int) write} before an
199   * automatic {@linkplain #flush() flush} will take place; always
200   * {@code 0} or a positive {@code int}
201   *
202   * @see
203   * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
204   * int, boolean)
205   *
206   * @see ChannelOutboundInvoker#flush()
207   */
208  public final int getFlushThreshold() {
209    return this.flushThreshold;
210  }
211
212  @Override
213  public final void write(final int singleByte) throws IOException {
214    this.write(this.createMessage(singleByte), 1);
215  }
216
217  @Override
218  public final void write(final byte[] bytes) throws IOException {
219    this.write(bytes, 0, bytes.length);
220  }
221
222  @Override
223  public final void write(final byte[] bytes, final int offset, final int length) throws IOException {
224    if (offset < 0 || length < 0 || offset + length > bytes.length) {
225      throw new IndexOutOfBoundsException();
226    }
227    this.write(this.createMessage(bytes, offset, length), length);
228  }
229
230  private final void write(final T message, final int length) throws IOException {
231    final ChannelPromise channelPromise = this.newPromise();
232    final int flushThreshold = this.getFlushThreshold();
233    switch (flushThreshold) {
234    case 0:
235      // Flush previous writes, if any
236      this.channelOutboundInvoker.flush();
237      break;
238    case Integer.MAX_VALUE:
239      break;
240    default:
241      final int bytesWritten = this.bytesWritten; // volatile read
242      if (bytesWritten > flushThreshold) {
243        // Flush previous writes, if any, and set our "days since
244        // flush" back to 0 (see #flush())
245        this.flush();
246      } else if (channelPromise.isVoid()) {
247        // Optimistically assume the write will succeed; if we get
248        // this wrong, all that happens is maybe we don't flush as
249        // often as expected.
250        this.bytesWritten = bytesWritten + length;
251      } else {
252        channelPromise.addListener(f -> this.bytesWritten = bytesWritten + length); // volatile write
253      }
254    }
255    this.channelOutboundInvoker.write(message, channelPromise);
256    maybeThrow(channelPromise.cause());
257  }
258
259  /**
260   * Returns a new message representing the single supplied {@code
261   * byte} to be {@linkplain ChannelOutboundInvoker#write(Object,
262   * ChannelPromise) written} by this {@link
263   * AbstractChannelOutboundInvokingOutputStream}'s various {@link
264   * #write(byte[], int, int) write} methods.
265   *
266   * <p>This method never returns {@code null}.</p>
267   *
268   * <p>Overrides of this method must not return {@code null}.</p>
269   *
270   * <p><strong>Note:</strong> The default implementation of this
271   * method is inefficient: it creates a new {@code byte[]} holding
272   * the sole {@code byte} represented by the {@code singleByte}
273   * parameter value and calls {@link #createMessage(byte[], int,
274   * int)} and returns its result.  Subclasses are encouraged, but not
275   * required, to override this method to be more efficient.</p>
276   *
277   * <p>Overrides of this method should be stateless.</p>
278   *
279   * @param singleByte an {@code int} whose low-order bits hold a
280   * {@code byte} to be written
281   *
282   * @return a new, non-{@code null} message to {@linkplain
283   * ChannelOutboundInvoker#write(Object, ChannelPromise) write}
284   *
285   * @exception IOException if an error occurs
286   *
287   * @see #createMessage(byte[], int, int)
288   *
289   * @see ChannelOutboundInvoker#write(Object, ChannelPromise)
290   */
291  protected T createMessage(final int singleByte) throws IOException {
292    return this.createMessage(new byte[] { (byte)singleByte }, 0, 1);
293  }
294
295  /**
296   * Returns a new message representing a portion (or all) of the
297   * supplied {@code byte} array that will be {@linkplain
298   * ChannelOutboundInvoker#write(Object, ChannelPromise) written} by
299   * this {@link AbstractChannelOutboundInvokingOutputStream}'s
300   * various {@link #write(byte[], int, int) write} methods.
301   *
302   * <p>Implementations of this method must not return {@code
303   * null}.</p>
304   *
305   * <p>Implementations of this method should be stateless.</p>
306   *
307   * @param bytes a {@code byte} array originating from,
308   * <em>e.g.</em>, a {@link #write(byte[], int, int)} method
309   * invocation; will never be {@code null}
310   *
311   * @param offset the (validated) offset within the supplied {@code
312   * byte} array from which to start reading; will always be {@code 0}
313   * or a positive {@code int} less than {@code length}
314   *
315   * @param length the (validated) length of the portion to read; will
316   * always be {@code 0} or a positive {@code int} less than or equal
317   * to the {@code length} of the supplied {@code byte} array
318   *
319   * @return a new, non-{@code null} message to {@linkplain
320   * ChannelOutboundInvoker#write(Object, ChannelPromise) write}
321   *
322   * @exception NullPointerException if {@code bytes} is {@code null}
323   *
324   * @exception IndexOutOfBoundsException if {@code offset} is
325   * negative, or {@code length} is negative, or {@code offset +
326   * length} is greater than the length of {@code bytes}
327   *
328   * @exception IOException if an error occurs during the actual
329   * creation of the message
330   *
331   * @see #write(byte[], int, int)
332   *
333   * @see OutputStream#write(byte[], int, int)
334   *
335   * @see ChannelOutboundInvoker#write(Object, ChannelPromise)
336   */
337  protected abstract T createMessage(final byte[] bytes, final int offset, final int length) throws IOException;
338
339  /**
340   * Returns a new, possibly {@code null}, message that should be
341   * written when {@link #close()} is invoked.
342   *
343   * <p>This method and its overrides may return {@code null} to
344   * indicate that no such write is required.</p>
345   *
346   * <p>The default implementation of this method returns {@code
347   * null}.</p>
348   *
349   * <p>Overrides of this method should be stateless.</p>
350   *
351   * @return a final message to write when {@link #close() close()} is
352   * called, or {@code null} if no final message needs to be written
353   *
354   * @see #close()
355   *
356   * @exception IOException if an error occurs
357   */
358  protected T createLastMessage() throws IOException {
359    return null;
360  }
361
362  /**
363   * Creates and returns new {@link ChannelPromise}s that will be used
364   * in many {@link ChannelOutboundInvoker} operations.
365   *
366   * <p>This method never returns {@code null}.</p>
367   *
368   * <p>Overrides of this method must not return {@code null}.</p>
369   *
370   * <p>The default implementation of this method returns the return
371   * value of invoking {@link ChannelOutboundInvoker#newPromise()}.
372   *
373   * @return a new, non-{@code null} {@link ChannelPromise} that will
374   * be supplied to many {@link ChannelOutboundInvoker} operations
375   *
376   * @see ChannelPromise
377   *
378   * @see ChannelOutboundInvoker#newPromise()
379   *
380   * @see ChannelOutboundInvoker#voidPromise()
381   */
382  protected ChannelPromise newPromise() {
383    return this.channelOutboundInvoker.newPromise();
384  }
385
386  /**
387   * Calls the {@link ChannelOutboundInvoker#flush()} method on the
388   * {@link ChannelOutboundInvoker} {@linkplain
389   * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
390   * int, boolean) supplied at construction time}.
391   *
392   * @see ChannelOutboundInvoker#flush()
393   *
394   * @see #getFlushThreshold()
395   *
396   * @see
397   * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
398   * int, boolean)
399   */
400  @Override
401  public final void flush() {
402    this.channelOutboundInvoker.flush();
403    this.bytesWritten = 0; // volatile write
404  }
405
406  /**
407   * {@linkplain OutputStream#close() Closes} this {@link
408   * AbstractChannelOutboundInvokingOutputStream}, optionally
409   * {@linkplain ChannelOutboundInvoker#writeAndFlush(Object,
410   * ChannelPromise) writing and flushing} a {@linkplain
411   * #createLastMessage() final message}, or simply just {@linkplain
412   * #flush() flushing} first, before possibly {@linkplain
413   * ChannelOutboundInvoker#close(ChannelPromise) closing the
414   * underlying <code>ChannelOutboundInvoker</code>}.
415   *
416   * @exception IOException if the {@link #createLastMessage()} method
417   * throws an {@link IOException}
418   *
419   * @see #createLastMessage()
420   *
421   * @see ChannelOutboundInvoker#close(ChannelPromise)
422   *
423   * @see
424   * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
425   * int, boolean)
426   */
427  @Override
428  public final void close() throws IOException {
429    super.close();
430    final Object lastMessage = this.createLastMessage();
431    if (lastMessage == null) {
432      this.flush();
433    } else {
434      final ChannelPromise channelPromise = this.newPromise();
435      if (channelPromise.isVoid()) {
436        this.channelOutboundInvoker.writeAndFlush(lastMessage, channelPromise);
437        this.bytesWritten = 0; // volatile write
438      } else {
439        channelPromise.addListener(f -> this.bytesWritten = 0); // volatile write
440        this.channelOutboundInvoker.writeAndFlush(lastMessage, channelPromise);
441      }
442      maybeThrow(channelPromise.cause());
443    }
444    if (this.closeChannelOutboundInvoker) {
445      final ChannelPromise channelPromise = this.newPromise();
446      this.channelOutboundInvoker.close(channelPromise);
447      maybeThrow(channelPromise.cause());
448    }
449  }
450
451
452  /*
453   * Static methods.
454   */
455
456
457  private static final void maybeThrow(final Throwable cause) throws IOException {
458    if (cause == null) {
459      return;
460    } else if (cause instanceof RuntimeException) {
461      throw (RuntimeException)cause;
462    } else if (cause instanceof IOException) {
463      throw (IOException)cause;
464    } else if (cause instanceof Exception) {
465      throw new IOException(cause.getMessage(), cause);
466    } else if (cause instanceof Error) {
467      throw (Error)cause;
468    } else {
469      throw new InternalError();
470    }
471  }
472
473}