001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.transport.netty;
020
021import io.netty.buffer.ByteBuf;
022import io.netty.buffer.ByteBufInputStream;
023import io.netty.buffer.Unpooled;
024import io.netty.channel.ChannelHandlerContext;
025import io.netty.channel.ChannelPromise;
026import io.netty.handler.stream.ChunkedStream;
027import io.netty.handler.stream.ChunkedWriteHandler;
028
029import java.io.IOException;
030import java.util.logging.Logger;
031
032/**
033 * Thin wrapper around ChunkedWriteHandler.
034 * <p>
035 * ChunkedWriteHandler only handles the down stream parts
036 * and just emits the chunks up stream. So we add an upstream
037 * handler that aggregates the chunks into its original form. This
038 * is guaranteed to be thread serial so state can be shared.
039 * <p>
040 * On the down stream side, we just decorate the original message
041 * with its size and allow the thread-serial base class to actually
042 * handle the chunking. We need to be careful since the decoration
043 * itself has to be thread-safe since netty does not guarantee thread
044 * serial access to down stream handlers.
045 * <p>
046 * We do not need to tag the writes since the base class ChunkedWriteHandler
047 * serializes access to the channel and first write will complete before
048 * the second begins.
049 */
050public class ChunkedReadWriteHandler extends ChunkedWriteHandler {
051
052  public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
053
054  private static final Logger LOG = Logger.getLogger(ChunkedReadWriteHandler.class.getName());
055
056  private boolean start = true;
057  private int expectedSize = 0;
058
059  private ByteBuf readBuffer;
060  private byte[] retArr;
061
062  /**
063   * @see org.jboss.netty.handler.stream.ChunkedWriteHandler#handleUpstream(
064   *      org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelEvent)
065   */
066  @Override
067  public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
068
069    if (msg instanceof byte[]) {
070
071      final byte[] data = (byte[]) msg;
072
073      if (start) {
074        //LOG.log(Level.FINEST, "{0} Starting dechunking of a chunked write", curThrName);
075        expectedSize = getSize(data);
076        // LOG.log(Level.FINEST, "Expected Size = {0}. Wrapping byte[{1}] into a ChannelBuffer",
077        // new Object[]{expectedSize,expectedSize});
078        retArr = new byte[expectedSize];
079        readBuffer = Unpooled.wrappedBuffer(retArr);
080        readBuffer.clear();
081        //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: cur sz = " +
082        // readBuffer.writerIndex() + " + " + (data.length - INT_SIZE) + " bytes will added by current chunk");
083        readBuffer.writeBytes(data, INT_SIZE, data.length - INT_SIZE);
084        //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: new sz = " +
085        // readBuffer.writerIndex());
086        start = false;
087      } else {
088        readBuffer.writeBytes(data);
089      }
090
091      if (readBuffer.writerIndex() == expectedSize) {
092        //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "{0} Dechunking complete." +
093        // "Creating upstream msg event with the dechunked byte[{1}]", new Object[]{curThrName, expectedSize});
094        //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "Resetting state to begin another dechunking",
095        // curThrName);
096        final byte[] temp = retArr;
097        start = true;
098        expectedSize = 0;
099        readBuffer.release();
100        retArr = null;
101        //LOG.log(Level.FINEST, "{0} Sending dechunked message upstream", curThrName);
102        super.channelRead(ctx, temp);
103      }
104    } else {
105      super.channelRead(ctx, msg);
106    }
107  }
108
109  /**
110   * Thread-safe since there is no shared instance state.
111   * Just prepend size to the message and stream it through
112   * a chunked stream and let the base method handle the actual
113   * chunking.
114   * <p>
115   * We do not need to tag the writes since the base class ChunkedWriteHandler
116   * serializes access to the channel and first write will complete before
117   * the second begins.
118   */
119  @Override
120  public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
121
122    if (msg instanceof ByteBuf) {
123
124      final ByteBuf bf = (ByteBuf) msg;
125
126      if (bf.hasArray()) {
127        final byte[] data = bf.array();
128        final byte[] size = sizeAsByteArr(data.length);
129        final ByteBuf writeBuffer = Unpooled.wrappedBuffer(size, data);
130        final ByteBufCloseableStream stream = new ByteBufCloseableStream(writeBuffer);
131        final ChunkedStream chunkedStream = new ChunkedStream(
132            stream, NettyChannelInitializer.MAXFRAMELENGTH - 1024);
133        super.write(ctx, chunkedStream, promise);
134      } else {
135        super.write(ctx, msg, promise);
136      }
137
138    } else {
139      super.write(ctx, msg, promise);
140    }
141  }
142
143  /**
144   * Converts the int size into a byte[].
145   *
146   * @return the bit representation of size
147   */
148  private byte[] sizeAsByteArr(final int size) {
149    final byte[] ret = new byte[INT_SIZE];
150    final ByteBuf intBuffer = Unpooled.wrappedBuffer(ret).order(Unpooled.LITTLE_ENDIAN);
151    intBuffer.clear();
152    intBuffer.writeInt(size);
153    intBuffer.release();
154    return ret;
155  }
156
157  /**
158   * Get expected size encoded as the first 4 bytes of data.
159   */
160  private int getSize(final byte[] data) {
161    return getSize(data, 0);
162  }
163
164  /**
165   * Get expected size encoded as offset + 4 bytes of data.
166   */
167  private int getSize(final byte[] data, final int offset) {
168
169    if (data.length - offset < INT_SIZE) {
170      return 0;
171    }
172
173    final ByteBuf intBuffer = Unpooled.wrappedBuffer(data, offset, INT_SIZE).order(Unpooled.LITTLE_ENDIAN);
174    final int ret = intBuffer.readInt();
175    intBuffer.release();
176
177    return ret;
178  }
179
180  /**
181   * Release Bytebuf when the stream closes.
182   */
183  private class ByteBufCloseableStream extends ByteBufInputStream {
184    private final ByteBuf buffer;
185
186    ByteBufCloseableStream(final ByteBuf buffer) {
187      super(buffer);
188      this.buffer = buffer;
189    }
190
191    @Override
192    public void close() throws IOException {
193      super.close();
194      buffer.release();
195    }
196  }
197}