001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2019–2020 microBean™. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.jersey.netty; 018 019import java.io.OutputStream; 020 021import java.util.Collection; 022import java.util.List; 023import java.util.Map; 024import java.util.Map.Entry; 025import java.util.Objects; 026 027import java.util.concurrent.ScheduledFuture; 028import java.util.concurrent.TimeUnit; 029 030import java.util.function.BiConsumer; 031import java.util.function.UnaryOperator; 032import java.util.function.Supplier; 033 034import java.util.logging.Logger; 035 036import io.netty.channel.Channel; // for javadoc only 037import io.netty.channel.ChannelConfig; // for javadoc only 038import io.netty.channel.ChannelHandlerContext; 039import io.netty.channel.ChannelInboundHandlerAdapter; 040import io.netty.channel.ChannelOutboundInvoker; // for javadoc only 041 042import org.glassfish.jersey.CommonProperties; // for javadoc only 043 044import org.glassfish.jersey.message.internal.CommittingOutputStream; // for javadoc only 045 046import org.glassfish.jersey.server.ApplicationHandler; 047import org.glassfish.jersey.server.ContainerException; 048import org.glassfish.jersey.server.ContainerRequest; 049import org.glassfish.jersey.server.ContainerResponse; 050 051import org.glassfish.jersey.server.spi.Container; 052import org.glassfish.jersey.server.spi.ContainerResponseWriter; 053import org.glassfish.jersey.server.spi.ContainerResponseWriter.TimeoutHandler; 054 055import org.microbean.jersey.netty.AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator; 056 057/** 058 * An abstract {@link ChannelInboundHandlerAdapter} that is also a 059 * {@link ContainerResponseWriter} that processes incoming {@link 060 * ContainerRequest} events, such as those dispatched by an {@link 061 * AbstractContainerRequestDecoder}, by supplying them to the {@link 062 * ApplicationHandler#handle(ContainerRequest)} method. 063 * 064 * <p>Instances of this class are in charge of properly invoking 065 * {@link ApplicationHandler#handle(ContainerRequest)}, thus adapting 066 * <a href="https://eclipse-ee4j.github.io/jersey/" 067 * target="_parent">Jersey</a> to <a href="https://netty.io/" 068 * target="_parent">Netty</a>'s constraints and vice versa.</p> 069 * 070 * @param <T> the type of message that will be written by instances of 071 * this class; see {@link #createOutputStream(long, 072 * ContainerResponse)} 073 * 074 * @author <a href="https://about.me/lairdnelson" 075 * target="_parent">Laird Nelson</a> 076 * 077 * @see #channelRead(ChannelHandlerContext, Object) 078 * 079 * @see #createOutputStream(long, ContainerResponse) 080 * 081 * @see ChannelInboundHandlerAdapter 082 * 083 * @see ApplicationHandler#handle(ContainerRequest) 084 * 085 * @see ContainerResponseWriter 086 */ 087public abstract class AbstractContainerRequestHandlingResponseWriter<T> extends ChannelInboundHandlerAdapter implements ContainerResponseWriter { 088 089 090 /* 091 * Static fields. 092 */ 093 094 095 private static final String cn = AbstractContainerRequestHandlingResponseWriter.class.getName(); 096 097 private static final Logger logger = Logger.getLogger(cn); 098 099 100 /* 101 * Instance fields. 102 */ 103 104 105 private final Supplier<? extends ApplicationHandler> applicationHandlerSupplier; 106 107 private ScheduledFuture<?> suspendTimeoutFuture; 108 109 private Runnable suspendTimeoutHandler; 110 111 private ChannelHandlerContext channelHandlerContext; 112 113 private final int flushThreshold; 114 115 private final ByteBufCreator byteBufCreator; 116 117 118 /* 119 * Constructors. 120 */ 121 122 123 /** 124 * Creates a new {@link 125 * AbstractContainerRequestHandlingResponseWriter}. 126 * 127 * @param applicationHandler an {@link ApplicationHandler} 128 * representing a <a 129 * href="https://jakarta.ee/specifications/restful-ws/" 130 * target="_parent">Jakarta RESTful Web Services application</a> 131 * whose {@link ApplicationHandler#handle(ContainerRequest)} method 132 * will serve as the bridge between Netty and Jersey; may be {@code 133 * null} somewhat pathologically but normally is not 134 * 135 * @see #AbstractContainerRequestHandlingResponseWriter(Supplier, 136 * int, 137 * AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator) 138 * 139 * @see ApplicationHandler 140 * 141 * @see ApplicationHandler#handle(ContainerRequest) 142 * 143 * @see #channelRead(ChannelHandlerContext, Object) 144 */ 145 protected AbstractContainerRequestHandlingResponseWriter(final ApplicationHandler applicationHandler) { 146 this(new ImmutableSupplier<>(applicationHandler), 8192, null); 147 } 148 149 /** 150 * Creates a new {@link 151 * AbstractContainerRequestHandlingResponseWriter}. 152 * 153 * @param applicationHandler an {@link ApplicationHandler} 154 * representing a <a 155 * href="https://jakarta.ee/specifications/restful-ws/" 156 * target="_parent">Jakarta RESTful Web Services application</a> 157 * whose {@link ApplicationHandler#handle(ContainerRequest)} method 158 * will serve as the bridge between Netty and Jersey; may be {@code 159 * null} somewhat pathologically but normally is not 160 * 161 * @param flushThreshold the minimum number of bytes that an {@link 162 * AbstractByteBufBackedChannelOutboundInvokingOutputStream} 163 * returned by the {@link #createOutputStream(long, 164 * ContainerResponse)} method must write before an automatic 165 * {@linkplain 166 * AbstractByteBufBackedChannelOutboundInvokingOutputStream#flush() 167 * flush} may take place; if less than {@code 0} {@code 0} will be 168 * used instead; if {@code Integer#MAX_VALUE} then it is suggested 169 * that no automatic flushing will occur 170 * 171 * @param byteBufCreator a {@link ByteBufCreator} that may be used 172 * (but does not have to be used) by the implementation of the 173 * {@link #createOutputStream(long, ContainerResponse)} method; may 174 * be {@code null} 175 * 176 * @see #AbstractContainerRequestHandlingResponseWriter(Supplier, 177 * int, 178 * AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator) 179 * 180 * @see ApplicationHandler 181 * 182 * @see ApplicationHandler#handle(ContainerRequest) 183 * 184 * @see #channelRead(ChannelHandlerContext, Object) 185 * 186 * @see #getFlushThreshold() 187 * 188 * @see #getByteBufCreator() 189 * 190 * @see #createOutputStream(long, ContainerResponse) 191 */ 192 protected AbstractContainerRequestHandlingResponseWriter(final ApplicationHandler applicationHandler, 193 final int flushThreshold, 194 final ByteBufCreator byteBufCreator) { 195 this(new ImmutableSupplier<>(applicationHandler), flushThreshold, byteBufCreator); 196 } 197 198 /** 199 * Creates a new {@link 200 * AbstractContainerRequestHandlingResponseWriter}. 201 * 202 * @param applicationHandlerSupplier a {@link Supplier} of an {@link 203 * ApplicationHandler} representing a <a 204 * href="https://jakarta.ee/specifications/restful-ws/" 205 * target="_parent">Jakarta RESTful Web Services application</a> 206 * whose {@link ApplicationHandler#handle(ContainerRequest)} method 207 * will serve as the bridge between Netty and Jersey; may be {@code 208 * null} somewhat pathologically but normally is not 209 * 210 * @see #AbstractContainerRequestHandlingResponseWriter(Supplier, 211 * int, 212 * AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator) 213 * 214 * @see ApplicationHandler 215 * 216 * @see ApplicationHandler#handle(ContainerRequest) 217 * 218 * @see #channelRead(ChannelHandlerContext, Object) 219 */ 220 protected AbstractContainerRequestHandlingResponseWriter(final Supplier<? extends ApplicationHandler> applicationHandlerSupplier) { 221 this(applicationHandlerSupplier, 8192, null); 222 } 223 224 /** 225 * Creates a new {@link 226 * AbstractContainerRequestHandlingResponseWriter}. 227 * 228 * @param applicationHandlerSupplier a {@link Supplier} of an {@link 229 * ApplicationHandler} representing a <a 230 * href="https://jakarta.ee/specifications/restful-ws/" 231 * target="_parent">Jakarta RESTful Web Services application</a> 232 * whose {@link ApplicationHandler#handle(ContainerRequest)} method 233 * will serve as the bridge between Netty and Jersey; may be {@code 234 * null} somewhat pathologically but normally is not 235 * 236 * @param flushThreshold the minimum number of bytes that an {@link 237 * AbstractByteBufBackedChannelOutboundInvokingOutputStream} 238 * returned by the {@link #createOutputStream(long, 239 * ContainerResponse)} method must write before an automatic 240 * {@linkplain 241 * AbstractByteBufBackedChannelOutboundInvokingOutputStream#flush() 242 * flush} may take place; if less than {@code 0} {@code 0} will be 243 * used instead; if {@code Integer#MAX_VALUE} then it is suggested 244 * that no automatic flushing will occur 245 * 246 * @param byteBufCreator a {@link ByteBufCreator} that may be used 247 * (but does not have to be used) by the implementation of the 248 * {@link #createOutputStream(long, ContainerResponse)} method; may 249 * be {@code null} 250 * 251 * @see ApplicationHandler 252 * 253 * @see ApplicationHandler#handle(ContainerRequest) 254 * 255 * @see #channelRead(ChannelHandlerContext, Object) 256 * 257 * @see #getFlushThreshold() 258 * 259 * @see #getByteBufCreator() 260 * 261 * @see #createOutputStream(long, ContainerResponse) 262 */ 263 protected AbstractContainerRequestHandlingResponseWriter(final Supplier<? extends ApplicationHandler> applicationHandlerSupplier, 264 final int flushThreshold, 265 final ByteBufCreator byteBufCreator) { 266 super(); 267 if (applicationHandlerSupplier == null) { 268 this.applicationHandlerSupplier = new ImmutableSupplier<>(new ApplicationHandler()); 269 } else { 270 this.applicationHandlerSupplier = applicationHandlerSupplier; 271 } 272 this.flushThreshold = Math.max(0, flushThreshold); 273 this.byteBufCreator = byteBufCreator; 274 } 275 276 277 /* 278 * Instance methods. 279 */ 280 281 282 /** 283 * Overrides {@link 284 * ChannelInboundHandlerAdapter#channelActive(ChannelHandlerContext)} 285 * to ensure that a channel read is performed one way or another. 286 * 287 * <p>If {@linkplain ChannelConfig#isAutoRead() autoread is active}, 288 * then the superclass' version of this method is executed with no 289 * changes. If {@linkplain ChannelConfig#isAutoRead() autoread is 290 * inactive}, then a call is made to {@link 291 * ChannelHandlerContext#read()}.</p> 292 * 293 * @param channelHandlerContext the {@link ChannelHandlerContext} in 294 * effect; must not be {@code null} 295 * 296 * @exception NullPointerException if {@code channelHandlerContext} 297 * is {@code null} 298 * 299 * @exception Exception if an error occurs 300 * 301 * @see ChannelConfig#isAutoRead() 302 * 303 * @see Channel#read() 304 * 305 * @see ChannelHandlerContext#read() 306 * 307 * @see #channelRead(ChannelHandlerContext, Object) 308 */ 309 @Override 310 public final void channelActive(final ChannelHandlerContext channelHandlerContext) throws Exception { 311 super.channelActive(channelHandlerContext); 312 // See 313 // https://github.com/netty/netty/blob/d446765b8469ca40db40f46e5c637d980b734a8a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java#L1408-L1413. 314 if (!channelHandlerContext.channel().config().isAutoRead()) { 315 // If autoRead was "on", then the superclass version of this 316 // method will have already performed a read from the channel 317 // (e.g. channel.read()). If autoRead is "off", then we get 318 // here, and we do a read from the ChannelHandlerContext (and so 319 // we don't have to traverse the whole pipeline). 320 channelHandlerContext.read(); 321 } 322 } 323 324 /** 325 * If the supplied {@code message} is a {@link ContainerRequest} 326 * then this method will {@linkplain 327 * ContainerRequest#setWriter(ContainerResponseWriter) install 328 * itself as that request's <code>ContainerResponseWriter</code>} 329 * and will invoke {@link 330 * ApplicationHandler#handle(ContainerRequest)}. 331 * 332 * <p>In all other cases this method will simply call {@link 333 * ChannelInboundHandlerAdapter#channelRead(ChannelHandlerContext, 334 * Object)} with the supplied {@code message}.</p> 335 * 336 * @param channelHandlerContext the {@link ChannelHandlerContext} in 337 * effect; must not be {@code null} 338 * 339 * @param message the incoming message, or event; may be {@code null} 340 * 341 * @exception NullPointerException if {@code channelHandlerContext} 342 * is {@code null}, or if the {@link Supplier} of {@link 343 * ApplicationHandler} instances supplied at construction time 344 * returns {@code null} 345 * 346 * @exception Exception if {@code message} is not an instance of 347 * {@link ContainerRequest} and {@link 348 * ChannelInboundHandlerAdapter#channelRead(ChannelHandlerContext, 349 * Object)} throws an {@link Exception} 350 * 351 * @see ApplicationHandler#handle(ContainerRequest) 352 * 353 * @see ContainerResponseWriter 354 * 355 * @see ContainerRequest#setWriter(ContainerResponseWriter) 356 */ 357 @Override 358 public final void channelRead(final ChannelHandlerContext channelHandlerContext, 359 final Object message) 360 throws Exception { 361 if (this.getChannelHandlerContext() != null) { 362 throw new IllegalStateException("this.getChannelHandlerContext() != null: " + this.getChannelHandlerContext()); 363 } 364 this.channelHandlerContext = Objects.requireNonNull(channelHandlerContext); 365 try { 366 if (message instanceof ContainerRequest) { 367 final ContainerRequest containerRequest = (ContainerRequest)message; 368 containerRequest.setWriter(this); 369 this.applicationHandlerSupplier.get().handle(containerRequest); 370 } else { 371 super.channelRead(channelHandlerContext, message); 372 } 373 } finally { 374 this.channelHandlerContext = null; 375 } 376 } 377 378 /** 379 * Overrides the {@link 380 * ChannelInboundHandlerAdapter#channelReadComplete(ChannelHandlerContext)} 381 * method to call {@link ChannelHandlerContext#flush() 382 * channelHandlerContext.flush()} before calling the superclass 383 * implementation. 384 * 385 * @param channelHandlerContext the {@link ChannelHandlerContext} in 386 * effect; must not be {@code null} 387 * 388 * @exception NullPointerException if {@code channelHandlerContext} is {@code null} 389 * 390 * @exception Exception if {@link 391 * ChannelInboundHandlerAdapter#channelReadComplete(ChannelHandlerContext)} 392 * throws an {@link Exception} 393 */ 394 @Override 395 public final void channelReadComplete(final ChannelHandlerContext channelHandlerContext) throws Exception { 396 channelHandlerContext.flush(); 397 // See 398 // https://github.com/netty/netty/blob/d446765b8469ca40db40f46e5c637d980b734a8a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java#L1408-L1413. 399 super.channelReadComplete(channelHandlerContext); 400 if (!channelHandlerContext.channel().config().isAutoRead()) { 401 // Ultimately a read is just an idempotent call so even if other 402 // handlers do this seemingly nothing bad will happen. See 403 // https://github.com/netty/netty/blob/9976ab7fe86e052d29ca7accf528c885e93dcb4c/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java#L402-L416. 404 // Even in the ancient case of blocking IO it's still 405 // idempotent: 406 // https://github.com/netty/netty/blob/9976ab7fe86e052d29ca7accf528c885e93dcb4c/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java#L101-L109 407 // Finally, transports like Epoll are less clear: 408 // https://github.com/netty/netty/blob/9976ab7fe86e052d29ca7accf528c885e93dcb4c/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java#L226-L242 409 // ...but Epoll too ultimately is idempotent: 410 // https://github.com/netty/netty/blob/9976ab7fe86e052d29ca7accf528c885e93dcb4c/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java#L226-L242 411 channelHandlerContext.read(); 412 } 413 } 414 415 /** 416 * Returns the {@link ChannelHandlerContext} in effect, or {@code 417 * null} if there is no such {@link ChannelHandlerContext}. 418 * 419 * <p>This method may return {@code null}.</p> 420 * 421 * @return the {@link ChannelHandlerContext} in effect, or {@code 422 * null} if there is no such {@link ChannelHandlerContext} 423 */ 424 protected final ChannelHandlerContext getChannelHandlerContext() { 425 return this.channelHandlerContext; 426 } 427 428 /* 429 * ContainerResponseWriter overrides. 430 */ 431 432 /** 433 * Returns {@code true} when invoked to indicate that buffering of 434 * entity content is supported and can be configured to be on or 435 * off. 436 * 437 * <p>Note that the return value of this method is a default value 438 * indicating that the <em>concept</em> of response buffering is 439 * enabled. The actual <em>configuration</em> of response buffering 440 * is a property of the application. Specifically, an application's 441 * response buffering policy <a 442 * href="https://github.com/eclipse-ee4j/jersey/blob/a40169547a602a582f5fed1fd8ebe595ff2b83f7/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java#L761-L778" 443 * target="_parent">can be configured</a>: if the application's 444 * configuration sets the {@link 445 * CommonProperties#OUTBOUND_CONTENT_LENGTH_BUFFER 446 * jersey.config.contentLength.buffer} property to a positive {@code 447 * int}, then buffering will occur, and if the application's 448 * configuration sets the {@link 449 * CommonProperties#OUTBOUND_CONTENT_LENGTH_BUFFER 450 * jersey.config.contentLength.buffer} property to a negative {@code 451 * int}, then buffering will not occur. If the application's 452 * configuration does nothing in this regard, response buffering 453 * will be enabled with a default buffer size of {@link 454 * CommittingOutputStream#DEFAULT_BUFFER_SIZE 8192}.</p> 455 * 456 * <p>(If, instead, this method had been written to return {@code 457 * false}, then no matter what configuration settings an application 458 * might specify in the realm of response buffering settings, 459 * response buffering of any kind would never be possible.)</p> 460 * 461 * @return {@code true} when invoked; never {@code false} 462 * 463 * @see ContainerResponseWriter#enableResponseBuffering() 464 * 465 * @see <a 466 * href="https://github.com/eclipse-ee4j/jersey/blob/8dcbaf4b5cb0fb345eb949acfa66b1fbe09d1ffb/core-server/src/main/java/org/glassfish/jersey/server/ServerRuntime.java#L634" 467 * target="_parent"><code>ServerRuntime.java</code></a> 468 * 469 * @see <a 470 * href="https://github.com/eclipse-ee4j/jersey/blob/a40169547a602a582f5fed1fd8ebe595ff2b83f7/core-server/src/main/java/org/glassfish/jersey/server/ContainerResponse.java#L352-L363" 471 * target="_parent"><code>ContainerResponse.java</code></a> 472 * 473 * @see <a 474 * href="https://github.com/eclipse-ee4j/jersey/blob/a40169547a602a582f5fed1fd8ebe595ff2b83f7/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java#L761-L778" 475 * target="_parent"><code>OutboundMessageContext.java</code></a> 476 */ 477 @Override 478 public final boolean enableResponseBuffering() { 479 return true; 480 } 481 482 /** 483 * Writes the status and headers portion of the response present in 484 * the supplied {@link ContainerResponse} by calling the {@link 485 * #writeStatusAndHeaders(long, ContainerResponse)} method, and, if 486 * the supplied {@code contentLength} is not {@code 0L} and that 487 * method returns {@code true} indicating that output will be 488 * forthcoming, returns the result of invoking {@link 489 * #createOutputStream(long, ContainerResponse)}. 490 * 491 * <p>In all other cases, this method returns {@code null}.</p> 492 * 493 * @param contentLength the content length as determined by the 494 * logic encapsulated by the {@link 495 * ApplicationHandler#handle(ContainerRequest)} method; a value less 496 * than zero indicates an unknown content length 497 * 498 * @param containerResponse the {@link ContainerResponse} containing 499 * status and headers information; must not be {@code null} 500 * 501 * @return the {@link OutputStream} returned by the {@link 502 * #createOutputStream(long, ContainerResponse)} method, or {@code 503 * null} 504 * 505 * @exception NullPointerException if {@code containerResponse} is 506 * {@code null} 507 * 508 * @see #writeStatusAndHeaders(long, ContainerResponse) 509 * 510 * @see #createOutputStream(long, ContainerResponse) 511 * 512 * @see ApplicationHandler#handle(ContainerRequest) 513 */ 514 @Override 515 public final OutputStream writeResponseStatusAndHeaders(final long contentLength, 516 final ContainerResponse containerResponse) { 517 final OutputStream returnValue; 518 if (this.writeStatusAndHeaders(contentLength, Objects.requireNonNull(containerResponse)) && contentLength != 0L) { 519 returnValue = this.createOutputStream(contentLength, containerResponse); 520 } else { 521 returnValue = null; 522 } 523 return returnValue; 524 } 525 526 /** 527 * Writes the status and headers portion of the response present in 528 * the supplied {@link ContainerResponse} and returns {@code true} 529 * if further output is forthcoming. 530 * 531 * <p>Implementations of this method must not call the {@link 532 * #writeResponseStatusAndHeaders(long, ContainerResponse)} method 533 * or an infinite loop may result.</p> 534 * 535 * <p>Implementations of this method must not call the {@link 536 * #createOutputStream(long, ContainerResponse)} method or undefined 537 * behavior may result.</p> 538 * 539 * @param contentLength the content length as determined by the 540 * logic encapsulated by the {@link 541 * ApplicationHandler#handle(ContainerRequest)} method; a value less 542 * than zero indicates an unknown content length 543 * 544 * @param containerResponse the {@link ContainerResponse} containing 545 * status and headers information; must not be {@code null} 546 * 547 * @return {@code true} if the {@link #createOutputStream(long, 548 * ContainerResponse)} method should be invoked, <em>i.e.</em> if 549 * further output is forthcoming 550 * 551 * @exception NullPointerException if {@code containerResponse} is 552 * {@code null} 553 * 554 * @see ApplicationHandler#handle(ContainerRequest) 555 * 556 * @see #createOutputStream(long, ContainerResponse) 557 */ 558 protected abstract boolean writeStatusAndHeaders(final long contentLength, 559 final ContainerResponse containerResponse); 560 561 /** 562 * Creates and returns a new {@link 563 * AbstractChannelOutboundInvokingOutputStream}, or returns {@code 564 * null} if it is determined that no {@link 565 * AbstractChannelOutboundInvokingOutputStream} is required given 566 * the supplied {@code contentLength} parameter value. 567 * 568 * <p>Implementations of this method may return {@code null}.</p> 569 * 570 * @param contentLength the content length as determined by the 571 * logic encapsulated by the {@link 572 * ApplicationHandler#handle(ContainerRequest)} method; a value less 573 * than zero indicates an unknown content length; must not be equal 574 * to {@code 0L} 575 * 576 * @param containerResponse the {@link ContainerResponse} containing 577 * status and headers information; must not be {@code null}; may be 578 * (and often is) ignored by implementations 579 * 580 * @return a new {@link AbstractChannelOutboundInvokingOutputStream} 581 * implementation, or {@code null} 582 * 583 * @exception NullPointerException if {@code containerResponse} is 584 * {@code null} 585 * 586 * @exception IllegalArgumentException if {@code contentLength} is 587 * equal to {@code 0L} 588 */ 589 protected abstract AbstractChannelOutboundInvokingOutputStream<? extends T> createOutputStream(final long contentLength, 590 final ContainerResponse containerResponse); 591 592 /** 593 * Returns the minimum number of bytes that an {@link 594 * AbstractChannelOutboundInvokingOutputStream} returned by the 595 * {@link #createOutputStream(long, ContainerResponse)} method must 596 * write before an automatic {@linkplain 597 * AbstractChannelOutboundInvokingOutputStream#flush() flush} may 598 * take place. 599 * 600 * <p><strong>Note:</strong> Implementations of the {@link 601 * #createOutputStream(long, ContainerResponse)} method may choose 602 * to ignore the return value of this method. It is supplied for 603 * convenience only for use by implementors of the {@link 604 * #createOutputStream(long, ContainerResponse)} method.</p> 605 * 606 * @return the minimum number of bytes that an {@link 607 * AbstractChannelOutboundInvokingOutputStream} returned by the 608 * {@link #createOutputStream(long, ContainerResponse)} method must 609 * write before an automatic {@linkplain 610 * AbstractChannelOutboundInvokingOutputStream#flush() flush} may 611 * take place; always {@code 0L} or a positive {@code int}; if 612 * {@code 0} it is suggested that automatic flushing occur after 613 * every write; if {@code Integer#MAX_VALUE} it is suggested that no 614 * automatic flushing should occur 615 * 616 * @see #createOutputStream(long, ContainerResponse) 617 * 618 * @see 619 * #AbstractContainerRequestHandlingResponseWriter(ApplicationHandler, 620 * int, 621 * AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator) 622 * 623 * @see 624 * AbstractByteBufBackedChannelOutboundInvokingOutputStream#AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 625 * int, boolean, ByteBufCreator) 626 */ 627 protected final int getFlushThreshold() { 628 return this.flushThreshold; 629 } 630 631 /** 632 * Returns a {@link ByteBufCreator} that may be used to create the 633 * {@link AbstractChannelOutboundInvokingOutputStream} 634 * implementation that must be returned by an implementation of the 635 * {@link #createOutputStream(long, ContainerResponse)} method. 636 * 637 * <p>This method may return {@code null}.</p> 638 * 639 * <p><strong>Note:</strong> Implementations of the {@link 640 * #createOutputStream(long, ContainerResponse)} method may choose 641 * to ignore the return value of this method. It is supplied for 642 * convenience only for use by implementors of the {@link 643 * #createOutputStream(long, ContainerResponse)} method.</p> 644 * 645 * @return a {@link ByteBufCreator}, or {@code null} 646 * 647 * @see #createOutputStream(long, ContainerResponse) 648 * 649 * @see 650 * #AbstractContainerRequestHandlingResponseWriter(ApplicationHandler, 651 * int, AbstractByteBufBackedChannelOutboundInvokingOutputStream.ByteBufCreator) 652 * 653 * @see 654 * AbstractByteBufBackedChannelOutboundInvokingOutputStream#AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 655 * int, boolean, ByteBufCreator) 656 */ 657 protected final ByteBufCreator getByteBufCreator() { 658 return this.byteBufCreator; 659 } 660 661 /** 662 * Invoked by Jersey when a {@link ContainerRequest} has been fully 663 * {@linkplain ApplicationHandler#handle(ContainerRequest) handled} 664 * successfully. 665 * 666 * <p>Either {@link #commit()} or {@link #failure(Throwable)} will 667 * be called by Jersey as the logically final operation in the logic 668 * encapsulated by the {@link 669 * ApplicationHandler#handle(ContainerRequest)} method, but not 670 * both.</p> 671 * 672 * <p>This implementation does nothing.</p> 673 * 674 * @see ContainerResponseWriter#commit() 675 * 676 * @see #failure(Throwable) 677 */ 678 @Override 679 public void commit() { 680 681 } 682 683 @Override 684 public final boolean suspend(final long timeout, 685 final TimeUnit timeUnit, 686 final TimeoutHandler timeoutHandler) { 687 // Lifted from Jersey's supplied Netty integration, with repairs. 688 final boolean returnValue; 689 if (timeoutHandler == null || this.suspendTimeoutHandler != null) { 690 returnValue = false; 691 } else { 692 this.suspendTimeoutHandler = () -> { 693 timeoutHandler.onTimeout(this); 694 this.suspendTimeoutHandler = null; 695 }; 696 if (timeout > 0L) { 697 final ChannelHandlerContext channelHandlerContext = Objects.requireNonNull(this.getChannelHandlerContext()); 698 this.suspendTimeoutFuture = 699 channelHandlerContext.executor().schedule(this.suspendTimeoutHandler, timeout, timeUnit); 700 } 701 returnValue = true; 702 } 703 return returnValue; 704 } 705 706 @Override 707 public final void setSuspendTimeout(final long timeout, final TimeUnit timeUnit) { 708 // Lifted from Jersey's supplied Netty integration, with repairs. 709 if (this.suspendTimeoutHandler == null) { 710 throw new IllegalStateException("this.suspendTimeoutHandler == null"); 711 } 712 if (this.suspendTimeoutFuture != null) { 713 this.suspendTimeoutFuture.cancel(true); 714 this.suspendTimeoutFuture = null; 715 } 716 if (timeout > 0L) { 717 final ChannelHandlerContext channelHandlerContext = Objects.requireNonNull(this.getChannelHandlerContext()); 718 this.suspendTimeoutFuture = 719 channelHandlerContext.executor().schedule(this.suspendTimeoutHandler, timeout, timeUnit); 720 } 721 } 722 723 /** 724 * Handles any failure case encountered by the logic encapsulated by 725 * the {@link ApplicationHandler#handle(ContainerRequest)} method. 726 * 727 * <p>Either {@link #commit()} or {@link #failure(Throwable)} will 728 * be called by Jersey as the logically final operation in the logic 729 * encapsulated by the {@link 730 * ApplicationHandler#handle(ContainerRequest)} method, but not 731 * both.</p> 732 * 733 * <p>This method calls the {@link #writeFailureMessage(Throwable)} 734 * method and takes great care to ensure that any {@link Throwable}s 735 * encountered along the way are properly recorded and {@linkplain 736 * Throwable#addSuppressed(Throwable) suppressed}.</p> 737 * 738 * <p><strong>This implementation never returns.</strong> A {@link 739 * ContainerException} is always thrown by this method.</p> 740 * 741 * @param failureCause the {@link Throwable} encountered by the 742 * {@link ApplicationHandler#handle(ContainerRequest)} method; may 743 * be {@code null} 744 * 745 * @exception ContainerException when this method is invoked; it 746 * will have the supplied {@code failureCause} as its {@linkplain 747 * Throwable#getCause() cause} 748 * 749 * @see #commit() 750 * 751 * @see ContainerResponseWriter#failure(Throwable) 752 */ 753 @Override 754 public final void failure(final Throwable failureCause) { 755 final ChannelHandlerContext channelHandlerContext = Objects.requireNonNull(this.getChannelHandlerContext()); 756 Throwable outerWriteProblem = null; 757 try { 758 this.writeFailureMessage(failureCause); 759 } catch (final RuntimeException | Error writeProblem) { 760 outerWriteProblem = writeProblem; 761 if (failureCause != null) { 762 boolean foundFailureCauseInSuppressedThrowables = false; 763 final Object[] suppressedThrowables = writeProblem.getSuppressed(); 764 if (suppressedThrowables != null && suppressedThrowables.length > 0) { 765 for (final Object suppressedThrowable : suppressedThrowables) { 766 if (suppressedThrowable == failureCause) { 767 foundFailureCauseInSuppressedThrowables = true; 768 break; 769 } 770 } 771 } 772 if (!foundFailureCauseInSuppressedThrowables) { 773 writeProblem.addSuppressed(failureCause); 774 } 775 } 776 throw writeProblem; 777 } finally { 778 Throwable outerFlushProblem = null; 779 try { 780 channelHandlerContext.flush(); 781 } catch (final RuntimeException | Error flushProblem) { 782 outerFlushProblem = flushProblem; 783 if (outerWriteProblem != null) { 784 flushProblem.addSuppressed(outerWriteProblem); 785 } else if (failureCause != null) { 786 flushProblem.addSuppressed(failureCause); 787 } 788 throw flushProblem; 789 } finally { 790 try { 791 channelHandlerContext.close(); 792 } catch (final RuntimeException | Error closeProblem) { 793 if (outerFlushProblem != null) { 794 closeProblem.addSuppressed(outerFlushProblem); 795 } else if (failureCause != null) { 796 closeProblem.addSuppressed(failureCause); 797 } 798 throw closeProblem; 799 } 800 } 801 } 802 if (failureCause == null) { 803 throw new ContainerException("failure"); 804 } else if (failureCause instanceof RuntimeException) { 805 throw (RuntimeException)failureCause; 806 } else if (failureCause instanceof Exception) { 807 throw new ContainerException(failureCause.getMessage(), failureCause); 808 } else { 809 throw (Error)failureCause; 810 } 811 } 812 813 /** 814 * Writes an appropriate message, possibly using the {@link 815 * #getChannelHandlerContext() ChannelHandlerContext} to do so. 816 * 817 * <p>Implementations of this method must not call the {@link 818 * #failure(Throwable)} method or an infinite loop may result.</p> 819 * 820 * @param failureCause the {@link Throwable} responsible for this 821 * method's invocation; may be {@code null} in pathological cases 822 */ 823 protected abstract void writeFailureMessage(final Throwable failureCause); 824 825 826 /* 827 * Static utility methods. 828 */ 829 830 831 /** 832 * A utility function that copies entries from a source {@link Map} 833 * by passing each entry to the supplied {@link BiConsumer}, 834 * transforming the keys beforehand using the supplied {@link 835 * UnaryOperator} and that is intended in this framework to be used 836 * to copy HTTP or HTTP/2 headers to and from the proper places. 837 * 838 * @param headersSource the source of the headers to copy; may be 839 * {@code null} in which case no action will be taken 840 * 841 * @param keyTransformer a {@link UnaryOperator} that transforms a 842 * header name; if {@code null} then the return value of {@link 843 * UnaryOperator#identity()} will be used instead 844 * 845 * @param headersTarget where the headers will be copied to; may be 846 * {@code null} in which case no action will be taken 847 */ 848 protected static final void copyHeaders(final Map<? extends String, ? extends List<String>> headersSource, 849 UnaryOperator<String> keyTransformer, 850 final BiConsumer<? super String, ? super List<String>> headersTarget) { 851 if (headersTarget != null && headersSource != null && !headersSource.isEmpty()) { 852 final Collection<? extends Entry<? extends String, ? extends List<String>>> entrySet = headersSource.entrySet(); 853 if (entrySet != null && !entrySet.isEmpty()) { 854 if (keyTransformer == null) { 855 keyTransformer = UnaryOperator.identity(); 856 } 857 for (final Entry<? extends String, ? extends List<String>> entry : entrySet) { 858 if (entry != null) { 859 headersTarget.accept(keyTransformer.apply(entry.getKey()), entry.getValue()); 860 } 861 } 862 } 863 } 864 } 865 866}