001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2019–2020 microBean™.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.jersey.netty;
018
019import java.io.OutputStream;
020
021import java.util.Collection;
022import java.util.List;
023import java.util.Map;
024import java.util.Map.Entry;
025import java.util.Objects;
026
027import java.util.concurrent.ScheduledFuture;
028import java.util.concurrent.TimeUnit;
029
030import java.util.function.BiConsumer;
031import java.util.function.UnaryOperator;
032import java.util.function.Supplier;
033
034import java.util.logging.Logger;
035
036import io.netty.channel.Channel; // for javadoc only
037import io.netty.channel.ChannelConfig; // for javadoc only
038import io.netty.channel.ChannelHandlerContext;
039import io.netty.channel.ChannelInboundHandlerAdapter;
040import io.netty.channel.ChannelOutboundInvoker; // for javadoc only
041
042import org.glassfish.jersey.CommonProperties; // for javadoc only
043
044import org.glassfish.jersey.message.internal.CommittingOutputStream; // for javadoc only
045
046import org.glassfish.jersey.server.ApplicationHandler;
047import org.glassfish.jersey.server.ContainerException;
048import org.glassfish.jersey.server.ContainerRequest;
049import org.glassfish.jersey.server.ContainerResponse;
050
051import org.glassfish.jersey.server.spi.Container;
052import org.glassfish.jersey.server.spi.ContainerResponseWriter;
053import org.glassfish.jersey.server.spi.ContainerResponseWriter.TimeoutHandler;
054
055import org.microbean.jersey.netty.AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator;
056
057/**
058 * An abstract {@link ChannelInboundHandlerAdapter} that is also a
059 * {@link ContainerResponseWriter} that processes incoming {@link
060 * ContainerRequest} events, such as those dispatched by an {@link
061 * AbstractContainerRequestDecoder}, by supplying them to the {@link
062 * ApplicationHandler#handle(ContainerRequest)} method.
063 *
064 * <p>Instances of this class are in charge of properly invoking
065 * {@link ApplicationHandler#handle(ContainerRequest)}, thus adapting
066 * <a href="https://eclipse-ee4j.github.io/jersey/"
067 * target="_parent">Jersey</a> to <a href="https://netty.io/"
068 * target="_parent">Netty</a>'s constraints and vice versa.</p>
069 *
070 * @param <T> the type of message that will be written by instances of
071 * this class; see {@link #createOutputStream(long,
072 * ContainerResponse)}
073 *
074 * @author <a href="https://about.me/lairdnelson"
075 * target="_parent">Laird Nelson</a>
076 *
077 * @see #channelRead(ChannelHandlerContext, Object)
078 *
079 * @see #createOutputStream(long, ContainerResponse)
080 *
081 * @see ChannelInboundHandlerAdapter
082 *
083 * @see ApplicationHandler#handle(ContainerRequest)
084 *
085 * @see ContainerResponseWriter
086 */
087public abstract class AbstractContainerRequestHandlingResponseWriter<T> extends ChannelInboundHandlerAdapter implements ContainerResponseWriter {
088
089
090  /*
091   * Static fields.
092   */
093
094
095  private static final String cn = AbstractContainerRequestHandlingResponseWriter.class.getName();
096
097  private static final Logger logger = Logger.getLogger(cn);
098
099
100  /*
101   * Instance fields.
102   */
103
104
105  private final Supplier<? extends ApplicationHandler> applicationHandlerSupplier;
106
107  private ScheduledFuture<?> suspendTimeoutFuture;
108
109  private Runnable suspendTimeoutHandler;
110
111  private ChannelHandlerContext channelHandlerContext;
112
113  private final int flushThreshold;
114
115  private final ByteBufCreator byteBufCreator;
116
117
118  /*
119   * Constructors.
120   */
121
122
123  /**
124   * Creates a new {@link
125   * AbstractContainerRequestHandlingResponseWriter}.
126   *
127   * @param applicationHandler an {@link ApplicationHandler}
128   * representing a <a
129   * href="https://jakarta.ee/specifications/restful-ws/"
130   * target="_parent">Jakarta RESTful Web Services application</a>
131   * whose {@link ApplicationHandler#handle(ContainerRequest)} method
132   * will serve as the bridge between Netty and Jersey; may be {@code
133   * null} somewhat pathologically but normally is not
134   *
135   * @see #AbstractContainerRequestHandlingResponseWriter(Supplier,
136   * int,
137   * AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator)
138   *
139   * @see ApplicationHandler
140   *
141   * @see ApplicationHandler#handle(ContainerRequest)
142   *
143   * @see #channelRead(ChannelHandlerContext, Object)
144   */
145  protected AbstractContainerRequestHandlingResponseWriter(final ApplicationHandler applicationHandler) {
146    this(new ImmutableSupplier<>(applicationHandler), 8192, null);
147  }
148
149  /**
150   * Creates a new {@link
151   * AbstractContainerRequestHandlingResponseWriter}.
152   *
153   * @param applicationHandler an {@link ApplicationHandler}
154   * representing a <a
155   * href="https://jakarta.ee/specifications/restful-ws/"
156   * target="_parent">Jakarta RESTful Web Services application</a>
157   * whose {@link ApplicationHandler#handle(ContainerRequest)} method
158   * will serve as the bridge between Netty and Jersey; may be {@code
159   * null} somewhat pathologically but normally is not
160   *
161   * @param flushThreshold the minimum number of bytes that an {@link
162   * AbstractByteBufBackedChannelOutboundInvokingOutputStream}
163   * returned by the {@link #createOutputStream(long,
164   * ContainerResponse)} method must write before an automatic
165   * {@linkplain
166   * AbstractByteBufBackedChannelOutboundInvokingOutputStream#flush()
167   * flush} may take place; if less than {@code 0} {@code 0} will be
168   * used instead; if {@code Integer#MAX_VALUE} then it is suggested
169   * that no automatic flushing will occur
170   *
171   * @param byteBufCreator a {@link ByteBufCreator} that may be used
172   * (but does not have to be used) by the implementation of the
173   * {@link #createOutputStream(long, ContainerResponse)} method; may
174   * be {@code null}
175   *
176   * @see #AbstractContainerRequestHandlingResponseWriter(Supplier,
177   * int,
178   * AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator)
179   *
180   * @see ApplicationHandler
181   *
182   * @see ApplicationHandler#handle(ContainerRequest)
183   *
184   * @see #channelRead(ChannelHandlerContext, Object)
185   *
186   * @see #getFlushThreshold()
187   *
188   * @see #getByteBufCreator()
189   *
190   * @see #createOutputStream(long, ContainerResponse)
191   */
192  protected AbstractContainerRequestHandlingResponseWriter(final ApplicationHandler applicationHandler,
193                                                           final int flushThreshold,
194                                                           final ByteBufCreator byteBufCreator) {
195    this(new ImmutableSupplier<>(applicationHandler), flushThreshold, byteBufCreator);
196  }
197
198  /**
199   * Creates a new {@link
200   * AbstractContainerRequestHandlingResponseWriter}.
201   *
202   * @param applicationHandlerSupplier a {@link Supplier} of an {@link
203   * ApplicationHandler} representing a <a
204   * href="https://jakarta.ee/specifications/restful-ws/"
205   * target="_parent">Jakarta RESTful Web Services application</a>
206   * whose {@link ApplicationHandler#handle(ContainerRequest)} method
207   * will serve as the bridge between Netty and Jersey; may be {@code
208   * null} somewhat pathologically but normally is not
209   *
210   * @see #AbstractContainerRequestHandlingResponseWriter(Supplier,
211   * int,
212   * AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator)
213   *
214   * @see ApplicationHandler
215   *
216   * @see ApplicationHandler#handle(ContainerRequest)
217   *
218   * @see #channelRead(ChannelHandlerContext, Object)
219   */
220  protected AbstractContainerRequestHandlingResponseWriter(final Supplier<? extends ApplicationHandler> applicationHandlerSupplier) {
221    this(applicationHandlerSupplier, 8192, null);
222  }
223
224  /**
225   * Creates a new {@link
226   * AbstractContainerRequestHandlingResponseWriter}.
227   *
228   * @param applicationHandlerSupplier a {@link Supplier} of an {@link
229   * ApplicationHandler} representing a <a
230   * href="https://jakarta.ee/specifications/restful-ws/"
231   * target="_parent">Jakarta RESTful Web Services application</a>
232   * whose {@link ApplicationHandler#handle(ContainerRequest)} method
233   * will serve as the bridge between Netty and Jersey; may be {@code
234   * null} somewhat pathologically but normally is not
235   *
236   * @param flushThreshold the minimum number of bytes that an {@link
237   * AbstractByteBufBackedChannelOutboundInvokingOutputStream}
238   * returned by the {@link #createOutputStream(long,
239   * ContainerResponse)} method must write before an automatic
240   * {@linkplain
241   * AbstractByteBufBackedChannelOutboundInvokingOutputStream#flush()
242   * flush} may take place; if less than {@code 0} {@code 0} will be
243   * used instead; if {@code Integer#MAX_VALUE} then it is suggested
244   * that no automatic flushing will occur
245   *
246   * @param byteBufCreator a {@link ByteBufCreator} that may be used
247   * (but does not have to be used) by the implementation of the
248   * {@link #createOutputStream(long, ContainerResponse)} method; may
249   * be {@code null}
250   *
251   * @see ApplicationHandler
252   *
253   * @see ApplicationHandler#handle(ContainerRequest)
254   *
255   * @see #channelRead(ChannelHandlerContext, Object)
256   *
257   * @see #getFlushThreshold()
258   *
259   * @see #getByteBufCreator()
260   *
261   * @see #createOutputStream(long, ContainerResponse)
262   */
263  protected AbstractContainerRequestHandlingResponseWriter(final Supplier<? extends ApplicationHandler> applicationHandlerSupplier,
264                                                           final int flushThreshold,
265                                                           final ByteBufCreator byteBufCreator) {
266    super();
267    if (applicationHandlerSupplier == null) {
268      this.applicationHandlerSupplier = new ImmutableSupplier<>(new ApplicationHandler());
269    } else {
270      this.applicationHandlerSupplier = applicationHandlerSupplier;
271    }
272    this.flushThreshold = Math.max(0, flushThreshold);
273    this.byteBufCreator = byteBufCreator;
274  }
275
276
277  /*
278   * Instance methods.
279   */
280
281
282  /**
283   * Overrides {@link
284   * ChannelInboundHandlerAdapter#channelActive(ChannelHandlerContext)}
285   * to ensure that a channel read is performed one way or another.
286   *
287   * <p>If {@linkplain ChannelConfig#isAutoRead() autoread is active},
288   * then the superclass' version of this method is executed with no
289   * changes.  If {@linkplain ChannelConfig#isAutoRead() autoread is
290   * inactive}, then a call is made to {@link
291   * ChannelHandlerContext#read()}.</p>
292   *
293   * @param channelHandlerContext the {@link ChannelHandlerContext} in
294   * effect; must not be {@code null}
295   *
296   * @exception NullPointerException if {@code channelHandlerContext}
297   * is {@code null}
298   *
299   * @exception Exception if an error occurs
300   *
301   * @see ChannelConfig#isAutoRead()
302   *
303   * @see Channel#read()
304   *
305   * @see ChannelHandlerContext#read()
306   *
307   * @see #channelRead(ChannelHandlerContext, Object)
308   */
309  @Override
310  public final void channelActive(final ChannelHandlerContext channelHandlerContext) throws Exception {
311    super.channelActive(channelHandlerContext);
312    // See
313    // https://github.com/netty/netty/blob/d446765b8469ca40db40f46e5c637d980b734a8a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java#L1408-L1413.
314    if (!channelHandlerContext.channel().config().isAutoRead()) {
315      // If autoRead was "on", then the superclass version of this
316      // method will have already performed a read from the channel
317      // (e.g. channel.read()).  If autoRead is "off", then we get
318      // here, and we do a read from the ChannelHandlerContext (and so
319      // we don't have to traverse the whole pipeline).
320      channelHandlerContext.read();
321    }
322  }
323
324  /**
325   * If the supplied {@code message} is a {@link ContainerRequest}
326   * then this method will {@linkplain
327   * ContainerRequest#setWriter(ContainerResponseWriter) install
328   * itself as that request's <code>ContainerResponseWriter</code>}
329   * and will invoke {@link
330   * ApplicationHandler#handle(ContainerRequest)}.
331   *
332   * <p>In all other cases this method will simply call {@link
333   * ChannelInboundHandlerAdapter#channelRead(ChannelHandlerContext,
334   * Object)} with the supplied {@code message}.</p>
335   *
336   * @param channelHandlerContext the {@link ChannelHandlerContext} in
337   * effect; must not be {@code null}
338   *
339   * @param message the incoming message, or event; may be {@code null}
340   *
341   * @exception NullPointerException if {@code channelHandlerContext}
342   * is {@code null}, or if the {@link Supplier} of {@link
343   * ApplicationHandler} instances supplied at construction time
344   * returns {@code null}
345   *
346   * @exception Exception if {@code message} is not an instance of
347   * {@link ContainerRequest} and {@link
348   * ChannelInboundHandlerAdapter#channelRead(ChannelHandlerContext,
349   * Object)} throws an {@link Exception}
350   *
351   * @see ApplicationHandler#handle(ContainerRequest)
352   *
353   * @see ContainerResponseWriter
354   *
355   * @see ContainerRequest#setWriter(ContainerResponseWriter)
356   */
357  @Override
358  public final void channelRead(final ChannelHandlerContext channelHandlerContext,
359                                final Object message)
360    throws Exception {
361    if (this.getChannelHandlerContext() != null) {
362      throw new IllegalStateException("this.getChannelHandlerContext() != null: " + this.getChannelHandlerContext());
363    }
364    this.channelHandlerContext = Objects.requireNonNull(channelHandlerContext);
365    try {
366      if (message instanceof ContainerRequest) {
367        final ContainerRequest containerRequest = (ContainerRequest)message;
368        containerRequest.setWriter(this);
369        this.applicationHandlerSupplier.get().handle(containerRequest);
370      } else {
371        super.channelRead(channelHandlerContext, message);
372      }
373    } finally {
374      this.channelHandlerContext = null;
375    }
376  }
377
378  /**
379   * Overrides the {@link
380   * ChannelInboundHandlerAdapter#channelReadComplete(ChannelHandlerContext)}
381   * method to call {@link ChannelHandlerContext#flush()
382   * channelHandlerContext.flush()} before calling the superclass
383   * implementation.
384   *
385   * @param channelHandlerContext the {@link ChannelHandlerContext} in
386   * effect; must not be {@code null}
387   *
388   * @exception NullPointerException if {@code channelHandlerContext} is {@code null}
389   *
390   * @exception Exception if {@link
391   * ChannelInboundHandlerAdapter#channelReadComplete(ChannelHandlerContext)}
392   * throws an {@link Exception}
393   */
394  @Override
395  public final void channelReadComplete(final ChannelHandlerContext channelHandlerContext) throws Exception {
396    channelHandlerContext.flush();
397    // See
398    // https://github.com/netty/netty/blob/d446765b8469ca40db40f46e5c637d980b734a8a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java#L1408-L1413.
399    super.channelReadComplete(channelHandlerContext);
400    if (!channelHandlerContext.channel().config().isAutoRead()) {
401      // Ultimately a read is just an idempotent call so even if other
402      // handlers do this seemingly nothing bad will happen.  See
403      // https://github.com/netty/netty/blob/9976ab7fe86e052d29ca7accf528c885e93dcb4c/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java#L402-L416.
404      // Even in the ancient case of blocking IO it's still
405      // idempotent:
406      // https://github.com/netty/netty/blob/9976ab7fe86e052d29ca7accf528c885e93dcb4c/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java#L101-L109
407      // Finally, transports like Epoll are less clear:
408      // https://github.com/netty/netty/blob/9976ab7fe86e052d29ca7accf528c885e93dcb4c/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java#L226-L242
409      // ...but Epoll too ultimately is idempotent:
410      // https://github.com/netty/netty/blob/9976ab7fe86e052d29ca7accf528c885e93dcb4c/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java#L226-L242
411      channelHandlerContext.read();
412    }
413  }
414
415  /**
416   * Returns the {@link ChannelHandlerContext} in effect, or {@code
417   * null} if there is no such {@link ChannelHandlerContext}.
418   *
419   * <p>This method may return {@code null}.</p>
420   *
421   * @return the {@link ChannelHandlerContext} in effect, or {@code
422   * null} if there is no such {@link ChannelHandlerContext}
423   */
424  protected final ChannelHandlerContext getChannelHandlerContext() {
425    return this.channelHandlerContext;
426  }
427
428  /*
429   * ContainerResponseWriter overrides.
430   */
431
432  /**
433   * Returns {@code true} when invoked to indicate that buffering of
434   * entity content is supported and can be configured to be on or
435   * off.
436   *
437   * <p>Note that the return value of this method is a default value
438   * indicating that the <em>concept</em> of response buffering is
439   * enabled.  The actual <em>configuration</em> of response buffering
440   * is a property of the application.  Specifically, an application's
441   * response buffering policy <a
442   * href="https://github.com/eclipse-ee4j/jersey/blob/a40169547a602a582f5fed1fd8ebe595ff2b83f7/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java#L761-L778"
443   * target="_parent">can be configured</a>: if the application's
444   * configuration sets the {@link
445   * CommonProperties#OUTBOUND_CONTENT_LENGTH_BUFFER
446   * jersey.config.contentLength.buffer} property to a positive {@code
447   * int}, then buffering will occur, and if the application's
448   * configuration sets the {@link
449   * CommonProperties#OUTBOUND_CONTENT_LENGTH_BUFFER
450   * jersey.config.contentLength.buffer} property to a negative {@code
451   * int}, then buffering will not occur.  If the application's
452   * configuration does nothing in this regard, response buffering
453   * will be enabled with a default buffer size of {@link
454   * CommittingOutputStream#DEFAULT_BUFFER_SIZE 8192}.</p>
455   *
456   * <p>(If, instead, this method had been written to return {@code
457   * false}, then no matter what configuration settings an application
458   * might specify in the realm of response buffering settings,
459   * response buffering of any kind would never be possible.)</p>
460   *
461   * @return {@code true} when invoked; never {@code false}
462   *
463   * @see ContainerResponseWriter#enableResponseBuffering()
464   *
465   * @see <a
466   * href="https://github.com/eclipse-ee4j/jersey/blob/8dcbaf4b5cb0fb345eb949acfa66b1fbe09d1ffb/core-server/src/main/java/org/glassfish/jersey/server/ServerRuntime.java#L634"
467   * target="_parent"><code>ServerRuntime.java</code></a>
468   *
469   * @see <a
470   * href="https://github.com/eclipse-ee4j/jersey/blob/a40169547a602a582f5fed1fd8ebe595ff2b83f7/core-server/src/main/java/org/glassfish/jersey/server/ContainerResponse.java#L352-L363"
471   * target="_parent"><code>ContainerResponse.java</code></a>
472   *
473   * @see <a
474   * href="https://github.com/eclipse-ee4j/jersey/blob/a40169547a602a582f5fed1fd8ebe595ff2b83f7/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java#L761-L778"
475   * target="_parent"><code>OutboundMessageContext.java</code></a>
476   */
477  @Override
478  public final boolean enableResponseBuffering() {
479    return true;
480  }
481
482  /**
483   * Writes the status and headers portion of the response present in
484   * the supplied {@link ContainerResponse} by calling the {@link
485   * #writeStatusAndHeaders(long, ContainerResponse)} method, and, if
486   * the supplied {@code contentLength} is not {@code 0L} and that
487   * method returns {@code true} indicating that output will be
488   * forthcoming, returns the result of invoking {@link
489   * #createOutputStream(long, ContainerResponse)}.
490   *
491   * <p>In all other cases, this method returns {@code null}.</p>
492   *
493   * @param contentLength the content length as determined by the
494   * logic encapsulated by the {@link
495   * ApplicationHandler#handle(ContainerRequest)} method; a value less
496   * than zero indicates an unknown content length
497   *
498   * @param containerResponse the {@link ContainerResponse} containing
499   * status and headers information; must not be {@code null}
500   *
501   * @return the {@link OutputStream} returned by the {@link
502   * #createOutputStream(long, ContainerResponse)} method, or {@code
503   * null}
504   *
505   * @exception NullPointerException if {@code containerResponse} is
506   * {@code null}
507   *
508   * @see #writeStatusAndHeaders(long, ContainerResponse)
509   *
510   * @see #createOutputStream(long, ContainerResponse)
511   *
512   * @see ApplicationHandler#handle(ContainerRequest)
513   */
514  @Override
515  public final OutputStream writeResponseStatusAndHeaders(final long contentLength,
516                                                          final ContainerResponse containerResponse) {
517    final OutputStream returnValue;
518    if (this.writeStatusAndHeaders(contentLength, Objects.requireNonNull(containerResponse)) && contentLength != 0L) {
519      returnValue = this.createOutputStream(contentLength, containerResponse);
520    } else {
521      returnValue = null;
522    }
523    return returnValue;
524  }
525
526  /**
527   * Writes the status and headers portion of the response present in
528   * the supplied {@link ContainerResponse} and returns {@code true}
529   * if further output is forthcoming.
530   *
531   * <p>Implementations of this method must not call the {@link
532   * #writeResponseStatusAndHeaders(long, ContainerResponse)} method
533   * or an infinite loop may result.</p>
534   *
535   * <p>Implementations of this method must not call the {@link
536   * #createOutputStream(long, ContainerResponse)} method or undefined
537   * behavior may result.</p>
538   *
539   * @param contentLength the content length as determined by the
540   * logic encapsulated by the {@link
541   * ApplicationHandler#handle(ContainerRequest)} method; a value less
542   * than zero indicates an unknown content length
543   *
544   * @param containerResponse the {@link ContainerResponse} containing
545   * status and headers information; must not be {@code null}
546   *
547   * @return {@code true} if the {@link #createOutputStream(long,
548   * ContainerResponse)} method should be invoked, <em>i.e.</em> if
549   * further output is forthcoming
550   *
551   * @exception NullPointerException if {@code containerResponse} is
552   * {@code null}
553   *
554   * @see ApplicationHandler#handle(ContainerRequest)
555   *
556   * @see #createOutputStream(long, ContainerResponse)
557   */
558  protected abstract boolean writeStatusAndHeaders(final long contentLength,
559                                                   final ContainerResponse containerResponse);
560
561  /**
562   * Creates and returns a new {@link
563   * AbstractChannelOutboundInvokingOutputStream}, or returns {@code
564   * null} if it is determined that no {@link
565   * AbstractChannelOutboundInvokingOutputStream} is required given
566   * the supplied {@code contentLength} parameter value.
567   *
568   * <p>Implementations of this method may return {@code null}.</p>
569   *
570   * @param contentLength the content length as determined by the
571   * logic encapsulated by the {@link
572   * ApplicationHandler#handle(ContainerRequest)} method; a value less
573   * than zero indicates an unknown content length; must not be equal
574   * to {@code 0L}
575   *
576   * @param containerResponse the {@link ContainerResponse} containing
577   * status and headers information; must not be {@code null}; may be
578   * (and often is) ignored by implementations
579   *
580   * @return a new {@link AbstractChannelOutboundInvokingOutputStream}
581   * implementation, or {@code null}
582   *
583   * @exception NullPointerException if {@code containerResponse} is
584   * {@code null}
585   *
586   * @exception IllegalArgumentException if {@code contentLength} is
587   * equal to {@code 0L}
588   */
589  protected abstract AbstractChannelOutboundInvokingOutputStream<? extends T> createOutputStream(final long contentLength,
590                                                                                                 final ContainerResponse containerResponse);
591
592  /**
593   * Returns the minimum number of bytes that an {@link
594   * AbstractChannelOutboundInvokingOutputStream} returned by the
595   * {@link #createOutputStream(long, ContainerResponse)} method must
596   * write before an automatic {@linkplain
597   * AbstractChannelOutboundInvokingOutputStream#flush() flush} may
598   * take place.
599   *
600   * <p><strong>Note:</strong> Implementations of the {@link
601   * #createOutputStream(long, ContainerResponse)} method may choose
602   * to ignore the return value of this method.  It is supplied for
603   * convenience only for use by implementors of the {@link
604   * #createOutputStream(long, ContainerResponse)} method.</p>
605   *
606   * @return the minimum number of bytes that an {@link
607   * AbstractChannelOutboundInvokingOutputStream} returned by the
608   * {@link #createOutputStream(long, ContainerResponse)} method must
609   * write before an automatic {@linkplain
610   * AbstractChannelOutboundInvokingOutputStream#flush() flush} may
611   * take place; always {@code 0L} or a positive {@code int}; if
612   * {@code 0} it is suggested that automatic flushing occur after
613   * every write; if {@code Integer#MAX_VALUE} it is suggested that no
614   * automatic flushing should occur
615   *
616   * @see #createOutputStream(long, ContainerResponse)
617   *
618   * @see
619   * #AbstractContainerRequestHandlingResponseWriter(ApplicationHandler,
620   * int,
621   * AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator)
622   *
623   * @see
624   * AbstractByteBufBackedChannelOutboundInvokingOutputStream#AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
625   * int, boolean, ByteBufCreator)
626   */
627  protected final int getFlushThreshold() {
628    return this.flushThreshold;
629  }
630
631  /**
632   * Returns a {@link ByteBufCreator} that may be used to create the
633   * {@link AbstractChannelOutboundInvokingOutputStream}
634   * implementation that must be returned by an implementation of the
635   * {@link #createOutputStream(long, ContainerResponse)} method.
636   *
637   * <p>This method may return {@code null}.</p>
638   *
639   * <p><strong>Note:</strong> Implementations of the {@link
640   * #createOutputStream(long, ContainerResponse)} method may choose
641   * to ignore the return value of this method.  It is supplied for
642   * convenience only for use by implementors of the {@link
643   * #createOutputStream(long, ContainerResponse)} method.</p>
644   *
645   * @return a {@link ByteBufCreator}, or {@code null}
646   *
647   * @see #createOutputStream(long, ContainerResponse)
648   *
649   * @see
650   * #AbstractContainerRequestHandlingResponseWriter(ApplicationHandler,
651   * int, AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator)
652   *
653   * @see
654   * AbstractByteBufBackedChannelOutboundInvokingOutputStream#AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
655   * int, boolean, ByteBufCreator)
656   */
657  protected final ByteBufCreator getByteBufCreator() {
658    return this.byteBufCreator;
659  }
660
661  /**
662   * Invoked by Jersey when a {@link ContainerRequest} has been fully
663   * {@linkplain ApplicationHandler#handle(ContainerRequest) handled}
664   * successfully.
665   *
666   * <p>Either {@link #commit()} or {@link #failure(Throwable)} will
667   * be called by Jersey as the logically final operation in the logic
668   * encapsulated by the {@link
669   * ApplicationHandler#handle(ContainerRequest)} method, but not
670   * both.</p>
671   *
672   * <p>This implementation does nothing.</p>
673   *
674   * @see ContainerResponseWriter#commit()
675   *
676   * @see #failure(Throwable)
677   */
678  @Override
679  public void commit() {
680
681  }
682
683  @Override
684  public final boolean suspend(final long timeout,
685                               final TimeUnit timeUnit,
686                               final TimeoutHandler timeoutHandler) {
687    // Lifted from Jersey's supplied Netty integration, with repairs.
688    final boolean returnValue;
689    if (timeoutHandler == null || this.suspendTimeoutHandler != null) {
690      returnValue = false;
691    } else {
692      this.suspendTimeoutHandler = () -> {
693        timeoutHandler.onTimeout(this);
694        this.suspendTimeoutHandler = null;
695      };
696      if (timeout > 0L) {
697        final ChannelHandlerContext channelHandlerContext = Objects.requireNonNull(this.getChannelHandlerContext());
698        this.suspendTimeoutFuture =
699          channelHandlerContext.executor().schedule(this.suspendTimeoutHandler, timeout, timeUnit);
700      }
701      returnValue = true;
702    }
703    return returnValue;
704  }
705
706  @Override
707  public final void setSuspendTimeout(final long timeout, final TimeUnit timeUnit) {
708    // Lifted from Jersey's supplied Netty integration, with repairs.
709    if (this.suspendTimeoutHandler == null) {
710      throw new IllegalStateException("this.suspendTimeoutHandler == null");
711    }
712    if (this.suspendTimeoutFuture != null) {
713      this.suspendTimeoutFuture.cancel(true);
714      this.suspendTimeoutFuture = null;
715    }
716    if (timeout > 0L) {
717      final ChannelHandlerContext channelHandlerContext = Objects.requireNonNull(this.getChannelHandlerContext());
718      this.suspendTimeoutFuture =
719        channelHandlerContext.executor().schedule(this.suspendTimeoutHandler, timeout, timeUnit);
720    }
721  }
722
723  /**
724   * Handles any failure case encountered by the logic encapsulated by
725   * the {@link ApplicationHandler#handle(ContainerRequest)} method.
726   *
727   * <p>Either {@link #commit()} or {@link #failure(Throwable)} will
728   * be called by Jersey as the logically final operation in the logic
729   * encapsulated by the {@link
730   * ApplicationHandler#handle(ContainerRequest)} method, but not
731   * both.</p>
732   *
733   * <p>This method calls the {@link #writeFailureMessage(Throwable)}
734   * method and takes great care to ensure that any {@link Throwable}s
735   * encountered along the way are properly recorded and {@linkplain
736   * Throwable#addSuppressed(Throwable) suppressed}.</p>
737   *
738   * <p><strong>This implementation never returns.</strong> A {@link
739   * ContainerException} is always thrown by this method.</p>
740   *
741   * @param failureCause the {@link Throwable} encountered by the
742   * {@link ApplicationHandler#handle(ContainerRequest)} method; may
743   * be {@code null}
744   *
745   * @exception ContainerException when this method is invoked; it
746   * will have the supplied {@code failureCause} as its {@linkplain
747   * Throwable#getCause() cause}
748   *
749   * @see #commit()
750   *
751   * @see ContainerResponseWriter#failure(Throwable)
752   */
753  @Override
754  public final void failure(final Throwable failureCause) {
755    final ChannelHandlerContext channelHandlerContext = Objects.requireNonNull(this.getChannelHandlerContext());
756    Throwable outerWriteProblem = null;
757    try {
758      this.writeFailureMessage(failureCause);
759    } catch (final RuntimeException | Error writeProblem) {
760      outerWriteProblem = writeProblem;
761      if (failureCause != null) {
762        boolean foundFailureCauseInSuppressedThrowables = false;
763        final Object[] suppressedThrowables = writeProblem.getSuppressed();
764        if (suppressedThrowables != null && suppressedThrowables.length > 0) {
765          for (final Object suppressedThrowable : suppressedThrowables) {
766            if (suppressedThrowable == failureCause) {
767              foundFailureCauseInSuppressedThrowables = true;
768              break;
769            }
770          }
771        }
772        if (!foundFailureCauseInSuppressedThrowables) {
773          writeProblem.addSuppressed(failureCause);
774        }
775      }
776      throw writeProblem;
777    } finally {
778      Throwable outerFlushProblem = null;
779      try {
780        channelHandlerContext.flush();
781      } catch (final RuntimeException | Error flushProblem) {
782        outerFlushProblem = flushProblem;
783        if (outerWriteProblem != null) {
784          flushProblem.addSuppressed(outerWriteProblem);
785        } else if (failureCause != null) {
786          flushProblem.addSuppressed(failureCause);
787        }
788        throw flushProblem;
789      } finally {
790        try {
791          channelHandlerContext.close();
792        } catch (final RuntimeException | Error closeProblem) {
793          if (outerFlushProblem != null) {
794            closeProblem.addSuppressed(outerFlushProblem);
795          } else if (failureCause != null) {
796            closeProblem.addSuppressed(failureCause);
797          }
798          throw closeProblem;
799        }
800      }
801    }
802    if (failureCause == null) {
803      throw new ContainerException("failure");
804    } else if (failureCause instanceof RuntimeException) {
805      throw (RuntimeException)failureCause;
806    } else if (failureCause instanceof Exception) {
807      throw new ContainerException(failureCause.getMessage(), failureCause);
808    } else {
809      throw (Error)failureCause;
810    }
811  }
812
813  /**
814   * Writes an appropriate message, possibly using the {@link
815   * #getChannelHandlerContext() ChannelHandlerContext} to do so.
816   *
817   * <p>Implementations of this method must not call the {@link
818   * #failure(Throwable)} method or an infinite loop may result.</p>
819   *
820   * @param failureCause the {@link Throwable} responsible for this
821   * method's invocation; may be {@code null} in pathological cases
822   */
823  protected abstract void writeFailureMessage(final Throwable failureCause);
824
825
826  /*
827   * Static utility methods.
828   */
829
830
831  /**
832   * A utility function that copies entries from a source {@link Map}
833   * by passing each entry to the supplied {@link BiConsumer},
834   * transforming the keys beforehand using the supplied {@link
835   * UnaryOperator} and that is intended in this framework to be used
836   * to copy HTTP or HTTP/2 headers to and from the proper places.
837   *
838   * @param headersSource the source of the headers to copy; may be
839   * {@code null} in which case no action will be taken
840   *
841   * @param keyTransformer a {@link UnaryOperator} that transforms a
842   * header name; if {@code null} then the return value of {@link
843   * UnaryOperator#identity()} will be used instead
844   *
845   * @param headersTarget where the headers will be copied to; may be
846   * {@code null} in which case no action will be taken
847   */
848  protected static final void copyHeaders(final Map<? extends String, ? extends List<String>> headersSource,
849                                          UnaryOperator<String> keyTransformer,
850                                          final BiConsumer<? super String, ? super List<String>> headersTarget) {
851    if (headersTarget != null && headersSource != null && !headersSource.isEmpty()) {
852      final Collection<? extends Entry<? extends String, ? extends List<String>>> entrySet = headersSource.entrySet();
853      if (entrySet != null && !entrySet.isEmpty()) {
854        if (keyTransformer == null) {
855          keyTransformer = UnaryOperator.identity();
856        }
857        for (final Entry<? extends String, ? extends List<String>> entry : entrySet) {
858          if (entry != null) {
859            headersTarget.accept(keyTransformer.apply(entry.getKey()), entry.getValue());
860          }
861        }
862      }
863    }
864  }
865
866}