001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.transport.netty; 020 021import io.netty.buffer.Unpooled; 022import io.netty.channel.Channel; 023import io.netty.channel.ChannelFuture; 024import io.netty.channel.ChannelFutureListener; 025import org.apache.reef.wake.remote.Encoder; 026import org.apache.reef.wake.remote.transport.Link; 027import org.apache.reef.wake.remote.transport.LinkListener; 028 029import java.net.SocketAddress; 030import java.util.logging.Level; 031import java.util.logging.Logger; 032 033/** 034 * Link implementation with Netty. 035 * 036 * If you set a {@code LinkListener<T>}, it keeps message until writeAndFlush operation completes 037 * and notifies whether the sent message transferred successfully through the listener. 038 */ 039public class NettyLink<T> implements Link<T> { 040 041 public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; 042 private static final Logger LOG = Logger.getLogger(NettyLink.class.getName()); 043 private final Channel channel; 044 private final Encoder<? super T> encoder; 045 private final LinkListener<? super T> listener; 046 047 /** 048 * Constructs a link. 049 * 050 * @param channel the channel 051 * @param encoder the encoder 052 */ 053 public NettyLink(final Channel channel, final Encoder<? super T> encoder) { 054 this(channel, encoder, null); 055 } 056 057 /** 058 * Constructs a link. 059 * 060 * @param channel the channel 061 * @param encoder the encoder 062 * @param listener the link listener 063 */ 064 public NettyLink(final Channel channel, 065 final Encoder<? super T> encoder, final LinkListener<? super T> listener) { 066 this.channel = channel; 067 this.encoder = encoder; 068 this.listener = listener; 069 } 070 071 072 /** 073 * Writes the message to this link. 074 * 075 * @param message the message 076 */ 077 @Override 078 public void write(final T message) { 079 LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message}); 080 final byte[] allData = encoder.encode(message); 081 // byte[] -> ByteBuf 082 if (listener != null) { 083 channel.writeAndFlush(Unpooled.wrappedBuffer(allData)) 084 .addListener(new NettyChannelFutureListener<>(message, listener)); 085 } else { 086 channel.writeAndFlush(Unpooled.wrappedBuffer(allData)); 087 } 088 } 089 090 /** 091 * Gets a local address of the link. 092 * 093 * @return a local socket address 094 */ 095 @Override 096 public SocketAddress getLocalAddress() { 097 return channel.localAddress(); 098 } 099 100 /** 101 * Gets a remote address of the link. 102 * 103 * @return a remote socket address 104 */ 105 @Override 106 public SocketAddress getRemoteAddress() { 107 return channel.remoteAddress(); 108 } 109 110 @Override 111 public String toString() { 112 return "localAddr: " + getLocalAddress() + " remoteAddr: " + getRemoteAddress(); 113 } 114} 115 116class NettyChannelFutureListener<T> implements ChannelFutureListener { 117 118 private final T message; 119 private LinkListener<T> listener; 120 121 NettyChannelFutureListener(final T message, final LinkListener<T> listener) { 122 this.message = message; 123 this.listener = listener; 124 } 125 126 @Override 127 public void operationComplete(final ChannelFuture channelFuture) throws Exception { 128 if (channelFuture.isSuccess()) { 129 listener.onSuccess(message); 130 } else { 131 listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message); 132 } 133 } 134}