This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.transport.netty;
020
021import io.netty.buffer.Unpooled;
022import io.netty.channel.Channel;
023import io.netty.channel.ChannelFuture;
024import io.netty.channel.ChannelFutureListener;
025import org.apache.reef.wake.remote.Encoder;
026import org.apache.reef.wake.remote.transport.Link;
027import org.apache.reef.wake.remote.transport.LinkListener;
028
029import java.net.SocketAddress;
030import java.util.logging.Level;
031import java.util.logging.Logger;
032
033/**
034 * Link implementation with Netty.
035 *
036 * If you set a {@code LinkListener<T>}, it keeps message until writeAndFlush operation completes
037 * and notifies whether the sent message transferred successfully through the listener.
038 */
039public class NettyLink<T> implements Link<T> {
040
041  public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
042
043  private static final Logger LOG = Logger.getLogger(NettyLink.class.getName());
044
045  private final Channel channel;
046  private final Encoder<? super T> encoder;
047  private final LinkListener<? super T> listener;
048
049  /**
050   * Constructs a link.
051   *
052   * @param channel the channel
053   * @param encoder the encoder
054   */
055  public NettyLink(final Channel channel, final Encoder<? super T> encoder) {
056    this(channel, encoder, null);
057  }
058
059  /**
060   * Constructs a link.
061   *
062   * @param channel  the channel
063   * @param encoder  the encoder
064   * @param listener the link listener
065   */
066  public NettyLink(final Channel channel, final Encoder<? super T> encoder, final LinkListener<? super T> listener) {
067    this.channel = channel;
068    this.encoder = encoder;
069    this.listener = listener;
070  }
071
072  /**
073   * Writes the message to this link.
074   *
075   * @param message the message
076   */
077  @Override
078  public void write(final T message) {
079    LOG.log(Level.FINEST, "write {0} :: {1}", new Object[] {channel, message});
080    final ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(encoder.encode(message)));
081    if (listener !=  null) {
082      future.addListener(new NettyChannelFutureListener<>(message, listener));
083    }
084  }
085
086  /**
087   * Gets a local address of the link.
088   *
089   * @return a local socket address
090   */
091  @Override
092  public SocketAddress getLocalAddress() {
093    return channel.localAddress();
094  }
095
096  /**
097   * Gets a remote address of the link.
098   *
099   * @return a remote socket address
100   */
101  @Override
102  public SocketAddress getRemoteAddress() {
103    return channel.remoteAddress();
104  }
105
106  @Override
107  public String toString() {
108    return "NettyLink: " + channel; // Channel has good .toString() implementation
109  }
110}
111
112class NettyChannelFutureListener<T> implements ChannelFutureListener {
113
114  private final T message;
115  private LinkListener<T> listener;
116
117  NettyChannelFutureListener(final T message, final LinkListener<T> listener) {
118    this.message = message;
119    this.listener = listener;
120  }
121
122  @Override
123  public void operationComplete(final ChannelFuture channelFuture) throws Exception {
124    if (channelFuture.isSuccess()) {
125      listener.onSuccess(message);
126    } else {
127      listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message);
128    }
129  }
130}