001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.transport.netty; 020 021import io.netty.buffer.ByteBuf; 022import io.netty.buffer.ByteBufInputStream; 023import io.netty.buffer.Unpooled; 024import io.netty.channel.ChannelHandlerContext; 025import io.netty.channel.ChannelPromise; 026import io.netty.handler.stream.ChunkedStream; 027import io.netty.handler.stream.ChunkedWriteHandler; 028 029import java.io.IOException; 030import java.util.logging.Logger; 031 032/** 033 * Thin wrapper around ChunkedWriteHandler 034 * <p/> 035 * ChunkedWriteHandler only handles the down stream parts 036 * and just emits the chunks up stream. So we add an upstream 037 * handler that aggregates the chunks into its original form. This 038 * is guaranteed to be thread serial so state can be shared. 039 * <p/> 040 * On the down stream side, we just decorate the original message 041 * with its size and allow the thread-serial base class to actually 042 * handle the chunking. We need to be careful since the decoration 043 * itself has to be thread-safe since netty does not guarantee thread 044 * serial access to down stream handlers. 045 * <p/> 046 * We do not need to tag the writes since the base class ChunkedWriteHandler 047 * serializes access to the channel and first write will complete before 048 * the second begins. 049 */ 050public class ChunkedReadWriteHandler extends ChunkedWriteHandler { 051 052 public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; 053 054 private static final Logger LOG = Logger.getLogger(ChunkedReadWriteHandler.class.getName()); 055 056 private boolean start = true; 057 private int expectedSize = 0; 058 059 private ByteBuf readBuffer; 060 private byte[] retArr; 061 062 /** 063 * @see org.jboss.netty.handler.stream.ChunkedWriteHandler#handleUpstream(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelEvent) 064 */ 065 @Override 066 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 067 068 if (msg instanceof byte[]) { 069 070 byte[] data = (byte[]) msg; 071 072 if (start) { 073 //LOG.log(Level.FINEST, "{0} Starting dechunking of a chunked write", curThrName); 074 expectedSize = getSize(data); 075 // LOG.log(Level.FINEST, "Expected Size = {0}. Wrapping byte[{1}] into a ChannelBuffer", new Object[]{expectedSize,expectedSize}); 076 retArr = new byte[expectedSize]; 077 readBuffer = Unpooled.wrappedBuffer(retArr); 078 readBuffer.clear(); 079 //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: cur sz = " + readBuffer.writerIndex() + " + " + (data.length - INT_SIZE) + " bytes will added by current chunk"); 080 readBuffer.writeBytes(data, INT_SIZE, data.length - INT_SIZE); 081 //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: new sz = " + readBuffer.writerIndex()); 082 start = false; 083 } else { 084 readBuffer.writeBytes(data); 085 } 086 087 if (readBuffer.writerIndex() == expectedSize) { 088 //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "{0} Dechunking complete. Creating upstream msg event with the dechunked byte[{1}]", new Object[]{curThrName, expectedSize}); 089 //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "Resetting state to begin another dechunking", curThrName); 090 byte[] temp = retArr; 091 start = true; 092 expectedSize = 0; 093 readBuffer.release(); 094 retArr = null; 095 //LOG.log(Level.FINEST, "{0} Sending dechunked message upstream", curThrName); 096 super.channelRead(ctx, temp); 097 } 098 } else { 099 super.channelRead(ctx, msg); 100 } 101 } 102 103 /** 104 * Thread-safe since there is no shared instance state. 105 * Just prepend size to the message and stream it through 106 * a chunked stream and let the base method handle the actual 107 * chunking. 108 * <p/> 109 * We do not need to tag the writes since the base class ChunkedWriteHandler 110 * serializes access to the channel and first write will complete before 111 * the second begins. 112 */ 113 @Override 114 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 115 116 if (msg instanceof ByteBuf) { 117 118 final ByteBuf bf = (ByteBuf) msg; 119 120 if (bf.hasArray()) { 121 final byte[] data = bf.array(); 122 final byte[] size = sizeAsByteArr(data.length); 123 final ByteBuf writeBuffer = Unpooled.wrappedBuffer(size, data); 124 final ByteBufCloseableStream stream = new ByteBufCloseableStream(writeBuffer); 125 final ChunkedStream chunkedStream = new ChunkedStream( 126 stream, NettyChannelInitializer.MAXFRAMELENGTH - 1024); 127 super.write(ctx, chunkedStream, promise); 128 } else { 129 super.write(ctx, msg, promise); 130 } 131 132 } else { 133 super.write(ctx, msg, promise); 134 } 135 } 136 137 /** 138 * Converts the int size into a byte[] 139 * 140 * @return the bit representation of size 141 */ 142 private byte[] sizeAsByteArr(final int size) { 143 final byte[] ret = new byte[INT_SIZE]; 144 final ByteBuf intBuffer = Unpooled.wrappedBuffer(ret).order(Unpooled.LITTLE_ENDIAN); 145 intBuffer.clear(); 146 intBuffer.writeInt(size); 147 intBuffer.release(); 148 return ret; 149 } 150 151 /** 152 * Get expected size encoded as the first 4 bytes of data 153 */ 154 private int getSize(final byte[] data) { 155 return getSize(data, 0); 156 } 157 158 /** 159 * Get expected size encoded as offset + 4 bytes of data 160 */ 161 private int getSize(final byte[] data, final int offset) { 162 163 if (data.length - offset < INT_SIZE) { 164 return 0; 165 } 166 167 final ByteBuf intBuffer = Unpooled.wrappedBuffer(data, offset, INT_SIZE).order(Unpooled.LITTLE_ENDIAN); 168 final int ret = intBuffer.readInt(); 169 intBuffer.release(); 170 171 return ret; 172 } 173 174 /* 175 * Release Bytebuf when the stream closes 176 */ 177 private class ByteBufCloseableStream extends ByteBufInputStream { 178 private final ByteBuf buffer; 179 180 public ByteBufCloseableStream(ByteBuf buffer) { 181 super(buffer); 182 this.buffer = buffer; 183 } 184 185 @Override 186 public void close() throws IOException { 187 super.close(); 188 buffer.release(); 189 } 190 } 191}