001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.transport.netty; 020 021import io.netty.buffer.Unpooled; 022import io.netty.channel.Channel; 023import org.apache.reef.wake.remote.Encoder; 024import org.apache.reef.wake.remote.transport.Link; 025import org.apache.reef.wake.remote.transport.LinkListener; 026 027import java.io.IOException; 028import java.net.SocketAddress; 029import java.util.logging.Level; 030import java.util.logging.Logger; 031 032/** 033 * Link implementation with Netty 034 */ 035public class NettyLink<T> implements Link<T> { 036 037 public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; 038 private static final Logger LOG = Logger.getLogger(NettyLink.class.getName()); 039 private final Channel channel; 040 private final Encoder<? super T> encoder; 041 private final LinkListener<? super T> listener; 042 043 /** 044 * Constructs a link 045 * 046 * @param channel the channel 047 * @param encoder the encoder 048 */ 049 public NettyLink(final Channel channel, final Encoder<? super T> encoder) { 050 this(channel, encoder, null); 051 } 052 053 /** 054 * Constructs a link 055 * 056 * @param channel the channel 057 * @param encoder the encoder 058 * @param listener the link listener 059 */ 060 public NettyLink(final Channel channel, 061 final Encoder<? super T> encoder, final LinkListener<? super T> listener) { 062 this.channel = channel; 063 this.encoder = encoder; 064 this.listener = listener; 065 } 066 067 068 /** 069 * Writes the message to this link 070 * 071 * @param message the message 072 */ 073 @Override 074 public void write(final T message) throws IOException { 075 LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message}); 076 byte[] allData = encoder.encode(message); 077 // byte[] -> ByteBuf 078 channel.writeAndFlush(Unpooled.wrappedBuffer(allData)); 079 } 080 081 /** 082 * Handles the message received 083 * 084 * @param message the message 085 */ 086 @Override 087 public void messageReceived(final T message) { 088 if (listener != null) { 089 listener.messageReceived(message); 090 } 091 } 092 093 /** 094 * Gets a local address of the link 095 * 096 * @return a local socket address 097 */ 098 @Override 099 public SocketAddress getLocalAddress() { 100 return channel.localAddress(); 101 } 102 103 /** 104 * Gets a remote address of the link 105 * 106 * @return a remote socket address 107 */ 108 @Override 109 public SocketAddress getRemoteAddress() { 110 return channel.remoteAddress(); 111 } 112 113 @Override 114 public String toString() { 115 return "localAddr: " + getLocalAddress() + " remoteAddr: " + getRemoteAddress(); 116 } 117}