001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.transport.netty; 020 021import io.netty.buffer.ByteBuf; 022import io.netty.buffer.ByteBufInputStream; 023import io.netty.buffer.Unpooled; 024import io.netty.channel.ChannelHandlerContext; 025import io.netty.channel.ChannelPromise; 026import io.netty.handler.stream.ChunkedStream; 027import io.netty.handler.stream.ChunkedWriteHandler; 028 029import java.io.IOException; 030import java.util.logging.Logger; 031 032/** 033 * Thin wrapper around ChunkedWriteHandler. 034 * <p> 035 * ChunkedWriteHandler only handles the down stream parts 036 * and just emits the chunks up stream. So we add an upstream 037 * handler that aggregates the chunks into its original form. This 038 * is guaranteed to be thread serial so state can be shared. 039 * <p> 040 * On the down stream side, we just decorate the original message 041 * with its size and allow the thread-serial base class to actually 042 * handle the chunking. We need to be careful since the decoration 043 * itself has to be thread-safe since netty does not guarantee thread 044 * serial access to down stream handlers. 045 * <p> 046 * We do not need to tag the writes since the base class ChunkedWriteHandler 047 * serializes access to the channel and first write will complete before 048 * the second begins. 049 */ 050public class ChunkedReadWriteHandler extends ChunkedWriteHandler { 051 052 public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; 053 054 private static final Logger LOG = Logger.getLogger(ChunkedReadWriteHandler.class.getName()); 055 056 private boolean start = true; 057 private int expectedSize = 0; 058 059 private ByteBuf readBuffer; 060 private byte[] retArr; 061 062 /** 063 * @see org.jboss.netty.handler.stream.ChunkedWriteHandler#handleUpstream( 064 * org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelEvent) 065 */ 066 @Override 067 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { 068 069 if (msg instanceof byte[]) { 070 071 final byte[] data = (byte[]) msg; 072 073 if (start) { 074 //LOG.log(Level.FINEST, "{0} Starting dechunking of a chunked write", curThrName); 075 expectedSize = getSize(data); 076 // LOG.log(Level.FINEST, "Expected Size = {0}. Wrapping byte[{1}] into a ChannelBuffer", 077 // new Object[]{expectedSize,expectedSize}); 078 retArr = new byte[expectedSize]; 079 readBuffer = Unpooled.wrappedBuffer(retArr); 080 readBuffer.clear(); 081 //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: cur sz = " + 082 // readBuffer.writerIndex() + " + " + (data.length - INT_SIZE) + " bytes will added by current chunk"); 083 readBuffer.writeBytes(data, INT_SIZE, data.length - INT_SIZE); 084 //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: new sz = " + 085 // readBuffer.writerIndex()); 086 start = false; 087 } else { 088 readBuffer.writeBytes(data); 089 } 090 091 if (readBuffer.writerIndex() == expectedSize) { 092 //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "{0} Dechunking complete." + 093 // "Creating upstream msg event with the dechunked byte[{1}]", new Object[]{curThrName, expectedSize}); 094 //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "Resetting state to begin another dechunking", 095 // curThrName); 096 final byte[] temp = retArr; 097 start = true; 098 expectedSize = 0; 099 readBuffer.release(); 100 retArr = null; 101 //LOG.log(Level.FINEST, "{0} Sending dechunked message upstream", curThrName); 102 super.channelRead(ctx, temp); 103 } 104 } else { 105 super.channelRead(ctx, msg); 106 } 107 } 108 109 /** 110 * Thread-safe since there is no shared instance state. 111 * Just prepend size to the message and stream it through 112 * a chunked stream and let the base method handle the actual 113 * chunking. 114 * <p> 115 * We do not need to tag the writes since the base class ChunkedWriteHandler 116 * serializes access to the channel and first write will complete before 117 * the second begins. 118 */ 119 @Override 120 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { 121 122 if (msg instanceof ByteBuf) { 123 124 final ByteBuf bf = (ByteBuf) msg; 125 126 if (bf.hasArray()) { 127 final byte[] data = bf.array(); 128 final byte[] size = sizeAsByteArr(data.length); 129 final ByteBuf writeBuffer = Unpooled.wrappedBuffer(size, data); 130 final ByteBufCloseableStream stream = new ByteBufCloseableStream(writeBuffer); 131 final ChunkedStream chunkedStream = new ChunkedStream( 132 stream, NettyChannelInitializer.MAXFRAMELENGTH - 1024); 133 super.write(ctx, chunkedStream, promise); 134 } else { 135 super.write(ctx, msg, promise); 136 } 137 138 } else { 139 super.write(ctx, msg, promise); 140 } 141 } 142 143 /** 144 * Converts the int size into a byte[]. 145 * 146 * @return the bit representation of size 147 */ 148 private byte[] sizeAsByteArr(final int size) { 149 final byte[] ret = new byte[INT_SIZE]; 150 final ByteBuf intBuffer = Unpooled.wrappedBuffer(ret).order(Unpooled.LITTLE_ENDIAN); 151 intBuffer.clear(); 152 intBuffer.writeInt(size); 153 intBuffer.release(); 154 return ret; 155 } 156 157 /** 158 * Get expected size encoded as the first 4 bytes of data. 159 */ 160 private int getSize(final byte[] data) { 161 return getSize(data, 0); 162 } 163 164 /** 165 * Get expected size encoded as offset + 4 bytes of data. 166 */ 167 private int getSize(final byte[] data, final int offset) { 168 169 if (data.length - offset < INT_SIZE) { 170 return 0; 171 } 172 173 final ByteBuf intBuffer = Unpooled.wrappedBuffer(data, offset, INT_SIZE).order(Unpooled.LITTLE_ENDIAN); 174 final int ret = intBuffer.readInt(); 175 intBuffer.release(); 176 177 return ret; 178 } 179 180 /** 181 * Release Bytebuf when the stream closes. 182 */ 183 private class ByteBufCloseableStream extends ByteBufInputStream { 184 private final ByteBuf buffer; 185 186 ByteBufCloseableStream(final ByteBuf buffer) { 187 super(buffer); 188 this.buffer = buffer; 189 } 190 191 @Override 192 public void close() throws IOException { 193 super.close(); 194 buffer.release(); 195 } 196 } 197}