001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2019–2020 microBean™. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.jersey.netty; 018 019import java.io.IOException; 020import java.io.OutputStream; 021 022import java.util.Objects; 023 024import io.netty.channel.Channel; 025import io.netty.channel.ChannelOutboundInvoker; 026import io.netty.channel.ChannelPromise; 027 028/** 029 * An {@link OutputStream} that delegates writing and flushing 030 * operations to a {@link ChannelOutboundInvoker}. 031 * 032 * <h2>Thread Safety</h2> 033 * 034 * <p>Instances of this class are safe for concurrent use by multiple 035 * threads.</p> 036 * 037 * @param <T> the type of message that will be written; see {@link 038 * #createMessage(byte[], int, int)} 039 * 040 * @author <a href="https://about.me/lairdnelson" 041 * target="_parent">Laird Nelson</a> 042 * 043 * @see ChannelOutboundInvoker 044 */ 045public abstract class AbstractChannelOutboundInvokingOutputStream<T> extends OutputStream { 046 047 048 /* 049 * Instance fields. 050 */ 051 052 053 /** 054 * The {@link ChannelOutboundInvoker} underlying this {@link 055 * AbstractChannelOutboundInvokingOutputStream} implementation to 056 * which most operations are adapted. 057 * 058 * @see ChannelOutboundInvoker 059 * 060 * @see 061 * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 062 * int, boolean) 063 */ 064 protected final ChannelOutboundInvoker channelOutboundInvoker; 065 066 /** 067 * Indicates whether the {@link #channelOutboundInvoker 068 * ChannelOutboundInvoker} should also be {@linkplain 069 * ChannelOutboundInvoker#close(ChannelPromise) closed} when {@link 070 * #close() close()} is called. 071 * 072 * @see 073 * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 074 * int, boolean) 075 */ 076 protected final boolean closeChannelOutboundInvoker; 077 078 private final int flushThreshold; 079 080 private volatile int bytesWritten; 081 082 083 /* 084 * Constructors; 085 */ 086 087 088 /** 089 * Creates a new {@link AbstractChannelOutboundInvokingOutputStream} 090 * that does not automatically flush and that does not ever 091 * {@linkplain ChannelOutboundInvoker#close() close} the supplied 092 * {@link ChannelOutboundInvoker}. 093 * 094 * @param channelOutboundInvoker the {@link ChannelOutboundInvoker} 095 * to which operations are adapted; must not be {@code null} 096 * 097 * @exception NullPointerException if {@code channelOutboundInvoker} 098 * is {@code null} 099 * 100 * @see 101 * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 102 * int, boolean) 103 * 104 * @see ChannelOutboundInvoker 105 */ 106 protected AbstractChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker) { 107 this(channelOutboundInvoker, Integer.MAX_VALUE, false); 108 } 109 110 /** 111 * Creates a new {@link AbstractChannelOutboundInvokingOutputStream} 112 * that does not automatically flush. 113 * 114 * @param channelOutboundInvoker the {@link ChannelOutboundInvoker} 115 * to which operations are adapted; must not be {@code null} 116 * 117 * @param closeChannelOutboundInvoker whether {@link 118 * ChannelOutboundInvoker#close(ChannelPromise)} will be called on 119 * the supplied {@link ChannelOutboundInvoker} when {@link #close() 120 * close()} is called 121 * 122 * @exception NullPointerException if {@code channelOutboundInvoker} 123 * is {@code null} 124 * 125 * @see 126 * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 127 * int, boolean) 128 * 129 * @see ChannelOutboundInvoker 130 * 131 * @see #close() 132 */ 133 protected AbstractChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker, 134 final boolean closeChannelOutboundInvoker) { 135 this(channelOutboundInvoker, Integer.MAX_VALUE, closeChannelOutboundInvoker); 136 } 137 138 /** 139 * Creates a new {@link 140 * AbstractChannelOutboundInvokingOutputStream}. 141 * 142 * @param channelOutboundInvoker the {@link ChannelOutboundInvoker} 143 * to which operations are adapted; must not be {@code null} 144 * 145 * @param flushThreshold the minimum number of bytes that this 146 * instance has to {@linkplain #write(byte[], int, int) write} 147 * before an automatic {@linkplain #flush() flush} will take place; 148 * if less than {@code 0} {@code 0} will be used instead; if {@code 149 * Integer#MAX_VALUE} then no automatic flushing will occur 150 * 151 * @param closeChannelOutboundInvoker whether {@link 152 * ChannelOutboundInvoker#close(ChannelPromise)} will be called on 153 * the supplied {@link ChannelOutboundInvoker} when {@link #close() 154 * close()} is called 155 * 156 * @exception NullPointerException if {@code channelOutboundInvoker} 157 * is {@code null} 158 * 159 * @see ChannelOutboundInvoker 160 * 161 * @see #getFlushThreshold() 162 * 163 * @see #close() 164 */ 165 protected AbstractChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker, 166 final int flushThreshold, 167 final boolean closeChannelOutboundInvoker) { 168 super(); 169 this.flushThreshold = Math.max(0, flushThreshold); 170 this.channelOutboundInvoker = Objects.requireNonNull(channelOutboundInvoker); 171 this.closeChannelOutboundInvoker = closeChannelOutboundInvoker; 172 } 173 174 175 /* 176 * Instance methods. 177 */ 178 179 180 /** 181 * Returns the minimum number of bytes that this {@link 182 * AbstractChannelOutboundInvokingOutputStream} implementation has 183 * to {@linkplain #write(byte[], int, int) write} before an 184 * automatic {@linkplain #flush() flush} will take place. 185 * 186 * <p>This method will always return {@code 0} or a positive {@code 187 * int}.</p> 188 * 189 * <p>If this method returns {@code 0}, then a call to {@link 190 * #flush()} will be made at some point after every {@link 191 * #write(byte[], int, int)} invocation.</p> 192 * 193 * <p>If this method returns {@link Integer#MAX_VALUE}, then no 194 * automatic flushing will occur.</p> 195 * 196 * @return the minimum number of bytes that this {@link 197 * AbstractChannelOutboundInvokingOutputStream} implementation has 198 * to {@linkplain #write(byte[], int, int) write} before an 199 * automatic {@linkplain #flush() flush} will take place; always 200 * {@code 0} or a positive {@code int} 201 * 202 * @see 203 * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 204 * int, boolean) 205 * 206 * @see ChannelOutboundInvoker#flush() 207 */ 208 public final int getFlushThreshold() { 209 return this.flushThreshold; 210 } 211 212 @Override 213 public final void write(final int singleByte) throws IOException { 214 this.write(this.createMessage(singleByte), 1); 215 } 216 217 @Override 218 public final void write(final byte[] bytes) throws IOException { 219 this.write(bytes, 0, bytes.length); 220 } 221 222 @Override 223 public final void write(final byte[] bytes, final int offset, final int length) throws IOException { 224 if (offset < 0 || length < 0 || offset + length > bytes.length) { 225 throw new IndexOutOfBoundsException(); 226 } 227 this.write(this.createMessage(bytes, offset, length), length); 228 } 229 230 private final void write(final T message, final int length) throws IOException { 231 final ChannelPromise channelPromise = this.newPromise(); 232 final int flushThreshold = this.getFlushThreshold(); 233 switch (flushThreshold) { 234 case 0: 235 // Flush previous writes, if any 236 this.channelOutboundInvoker.flush(); 237 break; 238 case Integer.MAX_VALUE: 239 break; 240 default: 241 final int bytesWritten = this.bytesWritten; // volatile read 242 if (bytesWritten > flushThreshold) { 243 // Flush previous writes, if any, and set our "days since 244 // flush" back to 0 (see #flush()) 245 this.flush(); 246 } else if (channelPromise.isVoid()) { 247 // Optimistically assume the write will succeed; if we get 248 // this wrong, all that happens is maybe we don't flush as 249 // often as expected. 250 this.bytesWritten = bytesWritten + length; 251 } else { 252 channelPromise.addListener(f -> this.bytesWritten = bytesWritten + length); // volatile write 253 } 254 } 255 this.channelOutboundInvoker.write(message, channelPromise); 256 maybeThrow(channelPromise.cause()); 257 } 258 259 /** 260 * Returns a new message representing the single supplied {@code 261 * byte} to be {@linkplain ChannelOutboundInvoker#write(Object, 262 * ChannelPromise) written} by this {@link 263 * AbstractChannelOutboundInvokingOutputStream}'s various {@link 264 * #write(byte[], int, int) write} methods. 265 * 266 * <p>This method never returns {@code null}.</p> 267 * 268 * <p>Overrides of this method must not return {@code null}.</p> 269 * 270 * <p><strong>Note:</strong> The default implementation of this 271 * method is inefficient: it creates a new {@code byte[]} holding 272 * the sole {@code byte} represented by the {@code singleByte} 273 * parameter value and calls {@link #createMessage(byte[], int, 274 * int)} and returns its result. Subclasses are encouraged, but not 275 * required, to override this method to be more efficient.</p> 276 * 277 * <p>Overrides of this method should be stateless.</p> 278 * 279 * @param singleByte an {@code int} whose low-order bits hold a 280 * {@code byte} to be written 281 * 282 * @return a new, non-{@code null} message to {@linkplain 283 * ChannelOutboundInvoker#write(Object, ChannelPromise) write} 284 * 285 * @exception IOException if an error occurs 286 * 287 * @see #createMessage(byte[], int, int) 288 * 289 * @see ChannelOutboundInvoker#write(Object, ChannelPromise) 290 */ 291 protected T createMessage(final int singleByte) throws IOException { 292 return this.createMessage(new byte[] { (byte)singleByte }, 0, 1); 293 } 294 295 /** 296 * Returns a new message representing a portion (or all) of the 297 * supplied {@code byte} array that will be {@linkplain 298 * ChannelOutboundInvoker#write(Object, ChannelPromise) written} by 299 * this {@link AbstractChannelOutboundInvokingOutputStream}'s 300 * various {@link #write(byte[], int, int) write} methods. 301 * 302 * <p>Implementations of this method must not return {@code 303 * null}.</p> 304 * 305 * <p>Implementations of this method should be stateless.</p> 306 * 307 * @param bytes a {@code byte} array originating from, 308 * <em>e.g.</em>, a {@link #write(byte[], int, int)} method 309 * invocation; will never be {@code null} 310 * 311 * @param offset the (validated) offset within the supplied {@code 312 * byte} array from which to start reading; will always be {@code 0} 313 * or a positive {@code int} less than {@code length} 314 * 315 * @param length the (validated) length of the portion to read; will 316 * always be {@code 0} or a positive {@code int} less than or equal 317 * to the {@code length} of the supplied {@code byte} array 318 * 319 * @return a new, non-{@code null} message to {@linkplain 320 * ChannelOutboundInvoker#write(Object, ChannelPromise) write} 321 * 322 * @exception NullPointerException if {@code bytes} is {@code null} 323 * 324 * @exception IndexOutOfBoundsException if {@code offset} is 325 * negative, or {@code length} is negative, or {@code offset + 326 * length} is greater than the length of {@code bytes} 327 * 328 * @exception IOException if an error occurs during the actual 329 * creation of the message 330 * 331 * @see #write(byte[], int, int) 332 * 333 * @see OutputStream#write(byte[], int, int) 334 * 335 * @see ChannelOutboundInvoker#write(Object, ChannelPromise) 336 */ 337 protected abstract T createMessage(final byte[] bytes, final int offset, final int length) throws IOException; 338 339 /** 340 * Returns a new, possibly {@code null}, message that should be 341 * written when {@link #close()} is invoked. 342 * 343 * <p>This method and its overrides may return {@code null} to 344 * indicate that no such write is required.</p> 345 * 346 * <p>The default implementation of this method returns {@code 347 * null}.</p> 348 * 349 * <p>Overrides of this method should be stateless.</p> 350 * 351 * @return a final message to write when {@link #close() close()} is 352 * called, or {@code null} if no final message needs to be written 353 * 354 * @see #close() 355 * 356 * @exception IOException if an error occurs 357 */ 358 protected T createLastMessage() throws IOException { 359 return null; 360 } 361 362 /** 363 * Creates and returns new {@link ChannelPromise}s that will be used 364 * in many {@link ChannelOutboundInvoker} operations. 365 * 366 * <p>This method never returns {@code null}.</p> 367 * 368 * <p>Overrides of this method must not return {@code null}.</p> 369 * 370 * <p>The default implementation of this method returns the return 371 * value of invoking {@link ChannelOutboundInvoker#newPromise()}. 372 * 373 * @return a new, non-{@code null} {@link ChannelPromise} that will 374 * be supplied to many {@link ChannelOutboundInvoker} operations 375 * 376 * @see ChannelPromise 377 * 378 * @see ChannelOutboundInvoker#newPromise() 379 * 380 * @see ChannelOutboundInvoker#voidPromise() 381 */ 382 protected ChannelPromise newPromise() { 383 return this.channelOutboundInvoker.newPromise(); 384 } 385 386 /** 387 * Calls the {@link ChannelOutboundInvoker#flush()} method on the 388 * {@link ChannelOutboundInvoker} {@linkplain 389 * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 390 * int, boolean) supplied at construction time}. 391 * 392 * @see ChannelOutboundInvoker#flush() 393 * 394 * @see #getFlushThreshold() 395 * 396 * @see 397 * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 398 * int, boolean) 399 */ 400 @Override 401 public final void flush() { 402 this.channelOutboundInvoker.flush(); 403 this.bytesWritten = 0; // volatile write 404 } 405 406 /** 407 * {@linkplain OutputStream#close() Closes} this {@link 408 * AbstractChannelOutboundInvokingOutputStream}, optionally 409 * {@linkplain ChannelOutboundInvoker#writeAndFlush(Object, 410 * ChannelPromise) writing and flushing} a {@linkplain 411 * #createLastMessage() final message}, or simply just {@linkplain 412 * #flush() flushing} first, before possibly {@linkplain 413 * ChannelOutboundInvoker#close(ChannelPromise) closing the 414 * underlying <code>ChannelOutboundInvoker</code>}. 415 * 416 * @exception IOException if the {@link #createLastMessage()} method 417 * throws an {@link IOException} 418 * 419 * @see #createLastMessage() 420 * 421 * @see ChannelOutboundInvoker#close(ChannelPromise) 422 * 423 * @see 424 * #AbstractChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 425 * int, boolean) 426 */ 427 @Override 428 public final void close() throws IOException { 429 super.close(); 430 final Object lastMessage = this.createLastMessage(); 431 if (lastMessage == null) { 432 this.flush(); 433 } else { 434 final ChannelPromise channelPromise = this.newPromise(); 435 if (channelPromise.isVoid()) { 436 this.channelOutboundInvoker.writeAndFlush(lastMessage, channelPromise); 437 this.bytesWritten = 0; // volatile write 438 } else { 439 channelPromise.addListener(f -> this.bytesWritten = 0); // volatile write 440 this.channelOutboundInvoker.writeAndFlush(lastMessage, channelPromise); 441 } 442 maybeThrow(channelPromise.cause()); 443 } 444 if (this.closeChannelOutboundInvoker) { 445 final ChannelPromise channelPromise = this.newPromise(); 446 this.channelOutboundInvoker.close(channelPromise); 447 maybeThrow(channelPromise.cause()); 448 } 449 } 450 451 452 /* 453 * Static methods. 454 */ 455 456 457 private static final void maybeThrow(final Throwable cause) throws IOException { 458 if (cause == null) { 459 return; 460 } else if (cause instanceof RuntimeException) { 461 throw (RuntimeException)cause; 462 } else if (cause instanceof IOException) { 463 throw (IOException)cause; 464 } else if (cause instanceof Exception) { 465 throw new IOException(cause.getMessage(), cause); 466 } else if (cause instanceof Error) { 467 throw (Error)cause; 468 } else { 469 throw new InternalError(); 470 } 471 } 472 473}