001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2019 microBean™. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.jersey.netty; 018 019import java.io.IOException; 020 021import io.netty.buffer.ByteBuf; 022import io.netty.buffer.Unpooled; 023 024import io.netty.channel.ChannelOutboundInvoker; 025 026/** 027 * An {@link AbstractChannelOutboundInvokingOutputStream} that 028 * {@linkplain #createMessage(ByteBuf) creates its messages} from 029 * {@link ByteBuf} instances. 030 * 031 * @param <T> the type of message that will be written; see {@link 032 * #createMessage(ByteBuf)} 033 * 034 * @author <a href="https://about.me/lairdnelson" 035 * target="_parent">Laird Nelson</a> 036 * 037 * @see #createMessage(ByteBuf) 038 */ 039public abstract class AbstractByteBufBackedChannelOutboundInvokingOutputStream<T> extends AbstractChannelOutboundInvokingOutputStream<T> { 040 041 042 /* 043 * Instance fields. 044 */ 045 046 047 private final ByteBufCreator byteBufCreator; 048 049 050 /* 051 * Constructors. 052 */ 053 054 055 /** 056 * Creates a new {@link 057 * AbstractByteBufBackedChannelOutboundInvokingOutputStream}. 058 * 059 * @param channelOutboundInvoker the {@link ChannelOutboundInvoker} 060 * to which operations are adapted; must not be {@code null} 061 * 062 * @param closeChannelOutboundInvoker whether {@link 063 * ChannelOutboundInvoker#close(ChannelPromise)} will be called on 064 * the supplied {@link ChannelOutboundInvoker} when {@link #close() 065 * close()} is called 066 * 067 * @see 068 * #AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 069 * int, boolean, ByteBufCreator) 070 */ 071 protected AbstractByteBufBackedChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker, 072 final boolean closeChannelOutboundInvoker) { 073 this(channelOutboundInvoker, Integer.MAX_VALUE, closeChannelOutboundInvoker, null); 074 } 075 076 /** 077 * Creates a new {@link 078 * AbstractByteBufBackedChannelOutboundInvokingOutputStream}. 079 * 080 * @param channelOutboundInvoker the {@link ChannelOutboundInvoker} 081 * to which operations are adapted; must not be {@code null} 082 * 083 * @param flushThreshold the minimum number of bytes that this 084 * instance has to {@linkplain #write(byte[], int, int) write} 085 * before an automatic {@linkplain #flush() flush} will take place; 086 * if less than {@code 0} {@code 0} will be used instead; if {@link 087 * Integer#MAX_VALUE} then no automatic flushing will occur 088 * 089 * @param closeChannelOutboundInvoker whether {@link 090 * ChannelOutboundInvoker#close(ChannelPromise)} will be called on 091 * the supplied {@link ChannelOutboundInvoker} when {@link #close() 092 * close()} is called 093 * 094 * @see 095 * #AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 096 * int, boolean, ByteBufCreator) 097 */ 098 protected AbstractByteBufBackedChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker, 099 final int flushThreshold, 100 final boolean closeChannelOutboundInvoker) { 101 this(channelOutboundInvoker, flushThreshold, closeChannelOutboundInvoker, null); 102 } 103 104 /** 105 * Creates a new {@link 106 * AbstractByteBufBackedChannelOutboundInvokingOutputStream}. 107 * 108 * @param channelOutboundInvoker the {@link ChannelOutboundInvoker} 109 * to which operations are adapted; must not be {@code null} 110 * 111 * @param flushThreshold the minimum number of bytes that this 112 * instance has to {@linkplain #write(byte[], int, int) write} 113 * before an automatic {@linkplain #flush() flush} will take place; 114 * if less than {@code 0} {@code 0} will be used instead; if {@link 115 * Integer#MAX_VALUE} then no automatic flushing will occur 116 * 117 * @param closeChannelOutboundInvoker whether {@link 118 * ChannelOutboundInvoker#close(ChannelPromise)} will be called on 119 * the supplied {@link ChannelOutboundInvoker} when {@link #close() 120 * close()} is called 121 * 122 * @param byteBufCreator a {@link ByteBufCreator} that will be used 123 * to {@linkplain ByteBufCreator#toByteBuf(byte[], int, int) create 124 * <code>ByteBuf</code> instances}; may be {@code null} in which 125 * case a default {@link ByteBufCreator} adapting {@link 126 * Unpooled#wrappedBuffer(byte[], int, int)} will be used instead 127 * 128 * @see ByteBufCreator 129 * 130 * @see Unpooled#wrappedBuffer(byte[], int, int) 131 */ 132 protected AbstractByteBufBackedChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker, 133 final int flushThreshold, 134 final boolean closeChannelOutboundInvoker, 135 final ByteBufCreator byteBufCreator) { 136 super(channelOutboundInvoker, flushThreshold, closeChannelOutboundInvoker); 137 if (byteBufCreator == null) { 138 this.byteBufCreator = Unpooled::wrappedBuffer; 139 } else { 140 this.byteBufCreator = byteBufCreator; 141 } 142 } 143 144 145 /* 146 * Instance methods. 147 */ 148 149 150 /** 151 * Returns the result of invoking the {@link 152 * #createMessage(ByteBuf)} method with a {@link ByteBuf} returned 153 * by the {@link ByteBufCreator} {@linkplain 154 * #AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker, 155 * int, boolean, ByteBufCreator) supplied at construction time}. 156 * 157 * @param bytes {@inheritDoc} 158 * 159 * @param offset {@inheritDoc} 160 * 161 * @param length {@inheritDoc} 162 * 163 * @return {@inheritDoc} 164 * 165 * @exception IndexOutOfBoundsException if {@code offset} is 166 * negative, or {@code length} is negative, or {@code offset + 167 * length} is greater than the length of {@code bytes} 168 * 169 * @exception IOException if the {@link #createMessage(ByteBuf)} 170 * method throws an {@link IOException} 171 * 172 * @see #createMessage(ByteBuf) 173 */ 174 @Override 175 protected final T createMessage(final byte[] bytes, final int offset, final int length) throws IOException { 176 return this.createMessage(this.byteBufCreator.toByteBuf(bytes, offset, length)); 177 } 178 179 /** 180 * Creates and returns a new message to be {@linkplain 181 * ChannelOutboundInvoker#write(Object, ChannelPromise) written}. 182 * 183 * <p>This method is called by the {@link #createMessage(byte[], 184 * int, int)} method.</p> 185 * 186 * @param content the {@link ByteBuf} to construct the message from; 187 * will never be {@code null}; must be read in its entirety, 188 * i.e. the return value of its {@link ByteBuf#readableBytes()} 189 * method after this method has completed must be {@code 0} 190 * 191 * @return a new message 192 * 193 * @exception IOException if an error occurs 194 * 195 * @see #createMessage(byte[], int, int) 196 */ 197 protected abstract T createMessage(final ByteBuf content) throws IOException; 198 199 200 /* 201 * Inner and nested classes. 202 */ 203 204 205 /** 206 * A creator of {@link ByteBuf}s that uses a {@code byte} array or a 207 * portion of a {@code byte} array as its raw materials. 208 * 209 * @author <a href="https://about.me/lairdnelson" 210 * target="_parent">Laird Nelson</a> 211 * 212 * @see #toByteBuf(byte[], int, int) 213 */ 214 @FunctionalInterface 215 public static interface ByteBufCreator { 216 217 /** 218 * Returns a {@link ByteBuf} that uses the designated {@code byte} 219 * array portion as its raw materials. 220 * 221 * <p>Implementations of this method must not return {@code 222 * null}.</p> 223 * 224 * @param bytes the {@code byte} array from which to read; must 225 * not be {@code null} 226 * 227 * @param offset the zero-based offset of the supplied {@code 228 * byte} array at which to start reading; must be {@code 0} or a 229 * positive {@code int} that is less than the length of the 230 * supplied {@code byte} array 231 * 232 * @param length the number of bytes to read; must be {@code 0} or 233 * a {@code positive int} that is less than or equal to the length 234 * of the supplied {@code byte} array minus the supplied {@code 235 * offset} 236 * 237 * @return a non-{@code null} {@link ByteBuf} 238 * 239 * @exception NullPointerException if {@code bytes} is {@code null} 240 * 241 * @exception IndexOutOfBoundsException if {@code offset} is 242 * negative, or {@code length} is negative, or {@code offset + 243 * length} is greater than the length of {@code bytes} 244 * 245 * @see Unpooled#wrappedBuffer(byte[], int, int) 246 */ 247 public ByteBuf toByteBuf(final byte[] bytes, final int offset, final int length); 248 249 } 250 251}