This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.transport.netty;
020
021import io.netty.buffer.ByteBuf;
022import io.netty.buffer.ByteBufInputStream;
023import io.netty.buffer.Unpooled;
024import io.netty.channel.ChannelHandlerContext;
025import io.netty.channel.ChannelPromise;
026import io.netty.handler.stream.ChunkedStream;
027import io.netty.handler.stream.ChunkedWriteHandler;
028
029import java.io.IOException;
030import java.util.logging.Logger;
031
032/**
033 * Thin wrapper around ChunkedWriteHandler
034 * <p/>
035 * ChunkedWriteHandler only handles the down stream parts
036 * and just emits the chunks up stream. So we add an upstream
037 * handler that aggregates the chunks into its original form. This
038 * is guaranteed to be thread serial so state can be shared.
039 * <p/>
040 * On the down stream side, we just decorate the original message
041 * with its size and allow the thread-serial base class to actually
042 * handle the chunking. We need to be careful since the decoration
043 * itself has to be thread-safe since netty does not guarantee thread
044 * serial access to down stream handlers.
045 * <p/>
046 * We do not need to tag the writes since the base class ChunkedWriteHandler
047 * serializes access to the channel and first write will complete before
048 * the second begins.
049 */
050public class ChunkedReadWriteHandler extends ChunkedWriteHandler {
051
052  public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
053
054  private static final Logger LOG = Logger.getLogger(ChunkedReadWriteHandler.class.getName());
055
056  private boolean start = true;
057  private int expectedSize = 0;
058
059  private ByteBuf readBuffer;
060  private byte[] retArr;
061
062  /**
063   * @see org.jboss.netty.handler.stream.ChunkedWriteHandler#handleUpstream(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelEvent)
064   */
065  @Override
066  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
067
068    if (msg instanceof byte[]) {
069
070      byte[] data = (byte[]) msg;
071
072      if (start) {
073        //LOG.log(Level.FINEST, "{0} Starting dechunking of a chunked write", curThrName);
074        expectedSize = getSize(data);
075        // LOG.log(Level.FINEST, "Expected Size = {0}. Wrapping byte[{1}] into a ChannelBuffer", new Object[]{expectedSize,expectedSize});
076        retArr = new byte[expectedSize];
077        readBuffer = Unpooled.wrappedBuffer(retArr);
078        readBuffer.clear();
079        //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: cur sz = " + readBuffer.writerIndex() + " + " + (data.length - INT_SIZE) + " bytes will added by current chunk");
080        readBuffer.writeBytes(data, INT_SIZE, data.length - INT_SIZE);
081        //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: new sz = " + readBuffer.writerIndex());
082        start = false;
083      } else {
084        readBuffer.writeBytes(data);
085      }
086
087      if (readBuffer.writerIndex() == expectedSize) {
088        //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "{0} Dechunking complete. Creating upstream msg event with the dechunked byte[{1}]", new Object[]{curThrName, expectedSize});
089        //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "Resetting state to begin another dechunking", curThrName);
090        byte[] temp = retArr;
091        start = true;
092        expectedSize = 0;
093        readBuffer.release();
094        retArr = null;
095        //LOG.log(Level.FINEST, "{0} Sending dechunked message upstream", curThrName);
096        super.channelRead(ctx, temp);
097      }
098    } else {
099      super.channelRead(ctx, msg);
100    }
101  }
102
103  /**
104   * Thread-safe since there is no shared instance state.
105   * Just prepend size to the message and stream it through
106   * a chunked stream and let the base method handle the actual
107   * chunking.
108   * <p/>
109   * We do not need to tag the writes since the base class ChunkedWriteHandler
110   * serializes access to the channel and first write will complete before
111   * the second begins.
112   */
113  @Override
114  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
115
116    if (msg instanceof ByteBuf) {
117
118      final ByteBuf bf = (ByteBuf) msg;
119
120      if (bf.hasArray()) {
121        final byte[] data = bf.array();
122        final byte[] size = sizeAsByteArr(data.length);
123        final ByteBuf writeBuffer = Unpooled.wrappedBuffer(size, data);
124        final ByteBufCloseableStream stream = new ByteBufCloseableStream(writeBuffer);
125        final ChunkedStream chunkedStream = new ChunkedStream(
126            stream, NettyChannelInitializer.MAXFRAMELENGTH - 1024);
127        super.write(ctx, chunkedStream, promise);
128      } else {
129        super.write(ctx, msg, promise);
130      }
131
132    } else {
133      super.write(ctx, msg, promise);
134    }
135  }
136
137  /**
138   * Converts the int size into a byte[]
139   *
140   * @return the bit representation of size
141   */
142  private byte[] sizeAsByteArr(final int size) {
143    final byte[] ret = new byte[INT_SIZE];
144    final ByteBuf intBuffer = Unpooled.wrappedBuffer(ret).order(Unpooled.LITTLE_ENDIAN);
145    intBuffer.clear();
146    intBuffer.writeInt(size);
147    intBuffer.release();
148    return ret;
149  }
150
151  /**
152   * Get expected size encoded as the first 4 bytes of data
153   */
154  private int getSize(final byte[] data) {
155    return getSize(data, 0);
156  }
157
158  /**
159   * Get expected size encoded as offset + 4 bytes of data
160   */
161  private int getSize(final byte[] data, final int offset) {
162
163    if (data.length - offset < INT_SIZE) {
164      return 0;
165    }
166
167    final ByteBuf intBuffer = Unpooled.wrappedBuffer(data, offset, INT_SIZE).order(Unpooled.LITTLE_ENDIAN);
168    final int ret = intBuffer.readInt();
169    intBuffer.release();
170
171    return ret;
172  }
173
174  /*
175   * Release Bytebuf when the stream closes
176   */
177  private class ByteBufCloseableStream extends ByteBufInputStream {
178    private final ByteBuf buffer;
179
180    public ByteBufCloseableStream(ByteBuf buffer) {
181      super(buffer);
182      this.buffer = buffer;
183    }
184
185    @Override
186    public void close() throws IOException {
187      super.close();
188      buffer.release();
189    }
190  }
191}