001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2019 microBean™.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.jersey.netty;
018
019import java.io.IOException;
020
021import io.netty.buffer.ByteBuf;
022import io.netty.buffer.Unpooled;
023
024import io.netty.channel.ChannelOutboundInvoker;
025
026/**
027 * An {@link AbstractChannelOutboundInvokingOutputStream} that
028 * {@linkplain #createMessage(ByteBuf) creates its messages} from
029 * {@link ByteBuf} instances.
030 *
031 * @param <T> the type of message that will be written; see {@link
032 * #createMessage(ByteBuf)}
033 *
034 * @author <a href="https://about.me/lairdnelson"
035 * target="_parent">Laird Nelson</a>
036 *
037 * @see #createMessage(ByteBuf)
038 */
039public abstract class AbstractByteBufBackedChannelOutboundInvokingOutputStream<T> extends AbstractChannelOutboundInvokingOutputStream<T> {
040
041
042  /*
043   * Instance fields.
044   */
045
046
047  private final ByteBufCreator byteBufCreator;
048
049
050  /*
051   * Constructors.
052   */
053
054
055  /**
056   * Creates a new {@link
057   * AbstractByteBufBackedChannelOutboundInvokingOutputStream}.
058   *
059   * @param channelOutboundInvoker the {@link ChannelOutboundInvoker}
060   * to which operations are adapted; must not be {@code null}
061   *
062   * @param closeChannelOutboundInvoker whether {@link
063   * ChannelOutboundInvoker#close(ChannelPromise)} will be called on
064   * the supplied {@link ChannelOutboundInvoker} when {@link #close()
065   * close()} is called
066   *
067   * @see
068   * #AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
069   * int, boolean, ByteBufCreator)
070   */
071  protected AbstractByteBufBackedChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker,
072                                                                     final boolean closeChannelOutboundInvoker) {
073    this(channelOutboundInvoker, Integer.MAX_VALUE, closeChannelOutboundInvoker, null);
074  }
075
076  /**
077   * Creates a new {@link
078   * AbstractByteBufBackedChannelOutboundInvokingOutputStream}.
079   *
080   * @param channelOutboundInvoker the {@link ChannelOutboundInvoker}
081   * to which operations are adapted; must not be {@code null}
082   *
083   * @param flushThreshold the minimum number of bytes that this
084   * instance has to {@linkplain #write(byte[], int, int) write}
085   * before an automatic {@linkplain #flush() flush} will take place;
086   * if less than {@code 0} {@code 0} will be used instead; if {@link
087   * Integer#MAX_VALUE} then no automatic flushing will occur
088   *
089   * @param closeChannelOutboundInvoker whether {@link
090   * ChannelOutboundInvoker#close(ChannelPromise)} will be called on
091   * the supplied {@link ChannelOutboundInvoker} when {@link #close()
092   * close()} is called
093   *
094   * @see
095   * #AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
096   * int, boolean, ByteBufCreator)
097   */
098  protected AbstractByteBufBackedChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker,
099                                                                     final int flushThreshold,
100                                                                     final boolean closeChannelOutboundInvoker) {
101    this(channelOutboundInvoker, flushThreshold, closeChannelOutboundInvoker, null);
102  }
103
104  /**
105   * Creates a new {@link
106   * AbstractByteBufBackedChannelOutboundInvokingOutputStream}.
107   *
108   * @param channelOutboundInvoker the {@link ChannelOutboundInvoker}
109   * to which operations are adapted; must not be {@code null}
110   *
111   * @param flushThreshold the minimum number of bytes that this
112   * instance has to {@linkplain #write(byte[], int, int) write}
113   * before an automatic {@linkplain #flush() flush} will take place;
114   * if less than {@code 0} {@code 0} will be used instead; if {@link
115   * Integer#MAX_VALUE} then no automatic flushing will occur
116   *
117   * @param closeChannelOutboundInvoker whether {@link
118   * ChannelOutboundInvoker#close(ChannelPromise)} will be called on
119   * the supplied {@link ChannelOutboundInvoker} when {@link #close()
120   * close()} is called
121   *
122   * @param byteBufCreator a {@link ByteBufCreator} that will be used
123   * to {@linkplain ByteBufCreator#toByteBuf(byte[], int, int) create
124   * <code>ByteBuf</code> instances}; may be {@code null} in which
125   * case a default {@link ByteBufCreator} adapting {@link
126   * Unpooled#wrappedBuffer(byte[], int, int)} will be used instead
127   *
128   * @see ByteBufCreator
129   *
130   * @see Unpooled#wrappedBuffer(byte[], int, int)
131   */
132  protected AbstractByteBufBackedChannelOutboundInvokingOutputStream(final ChannelOutboundInvoker channelOutboundInvoker,
133                                                                     final int flushThreshold,
134                                                                     final boolean closeChannelOutboundInvoker,
135                                                                     final ByteBufCreator byteBufCreator) {
136    super(channelOutboundInvoker, flushThreshold, closeChannelOutboundInvoker);
137    if (byteBufCreator == null) {
138      this.byteBufCreator = Unpooled::wrappedBuffer;
139    } else {
140      this.byteBufCreator = byteBufCreator;
141    }
142  }
143
144
145  /*
146   * Instance methods.
147   */
148
149
150  /**
151   * Returns the result of invoking the {@link
152   * #createMessage(ByteBuf)} method with a {@link ByteBuf} returned
153   * by the {@link ByteBufCreator} {@linkplain
154   * #AbstractByteBufBackedChannelOutboundInvokingOutputStream(ChannelOutboundInvoker,
155   * int, boolean, ByteBufCreator) supplied at construction time}.
156   *
157   * @param bytes {@inheritDoc}
158   *
159   * @param offset {@inheritDoc}
160   *
161   * @param length {@inheritDoc}
162   *
163   * @return {@inheritDoc}
164   *
165   * @exception IndexOutOfBoundsException if {@code offset} is
166   * negative, or {@code length} is negative, or {@code offset +
167   * length} is greater than the length of {@code bytes}
168   *
169   * @exception IOException if the {@link #createMessage(ByteBuf)}
170   * method throws an {@link IOException}
171   *
172   * @see #createMessage(ByteBuf)
173   */
174  @Override
175  protected final T createMessage(final byte[] bytes, final int offset, final int length) throws IOException {
176    return this.createMessage(this.byteBufCreator.toByteBuf(bytes, offset, length));
177  }
178
179  /**
180   * Creates and returns a new message to be {@linkplain
181   * ChannelOutboundInvoker#write(Object, ChannelPromise) written}.
182   *
183   * <p>This method is called by the {@link #createMessage(byte[],
184   * int, int)} method.</p>
185   *
186   * @param content the {@link ByteBuf} to construct the message from;
187   * will never be {@code null}; must be read in its entirety,
188   * i.e. the return value of its {@link ByteBuf#readableBytes()}
189   * method after this method has completed must be {@code 0}
190   *
191   * @return a new message
192   *
193   * @exception IOException if an error occurs
194   *
195   * @see #createMessage(byte[], int, int)
196   */
197  protected abstract T createMessage(final ByteBuf content) throws IOException;
198
199
200  /*
201   * Inner and nested classes.
202   */
203
204
205  /**
206   * A creator of {@link ByteBuf}s that uses a {@code byte} array or a
207   * portion of a {@code byte} array as its raw materials.
208   *
209   * @author <a href="https://about.me/lairdnelson"
210   * target="_parent">Laird Nelson</a>
211   *
212   * @see #toByteBuf(byte[], int, int)
213   */
214  @FunctionalInterface
215  public static interface ByteBufCreator {
216
217    /**
218     * Returns a {@link ByteBuf} that uses the designated {@code byte}
219     * array portion as its raw materials.
220     *
221     * <p>Implementations of this method must not return {@code
222     * null}.</p>
223     *
224     * @param bytes the {@code byte} array from which to read; must
225     * not be {@code null}
226     *
227     * @param offset the zero-based offset of the supplied {@code
228     * byte} array at which to start reading; must be {@code 0} or a
229     * positive {@code int} that is less than the length of the
230     * supplied {@code byte} array
231     *
232     * @param length the number of bytes to read; must be {@code 0} or
233     * a {@code positive int} that is less than or equal to the length
234     * of the supplied {@code byte} array minus the supplied {@code
235     * offset}
236     *
237     * @return a non-{@code null} {@link ByteBuf}
238     *
239     * @exception NullPointerException if {@code bytes} is {@code null}
240     *
241     * @exception IndexOutOfBoundsException if {@code offset} is
242     * negative, or {@code length} is negative, or {@code offset +
243     * length} is greater than the length of {@code bytes}
244     *
245     * @see Unpooled#wrappedBuffer(byte[], int, int)
246     */
247    public ByteBuf toByteBuf(final byte[] bytes, final int offset, final int length);
248
249  }
250
251}