This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.transport.netty;
020
021import io.netty.buffer.Unpooled;
022import io.netty.channel.Channel;
023import io.netty.channel.ChannelFuture;
024import io.netty.channel.ChannelFutureListener;
025import org.apache.reef.wake.remote.Encoder;
026import org.apache.reef.wake.remote.transport.Link;
027import org.apache.reef.wake.remote.transport.LinkListener;
028
029import java.net.SocketAddress;
030import java.util.logging.Level;
031import java.util.logging.Logger;
032
033/**
034 * Link implementation with Netty.
035 *
036 * If you set a {@code LinkListener<T>}, it keeps message until writeAndFlush operation completes
037 * and notifies whether the sent message transferred successfully through the listener.
038 */
039public class NettyLink<T> implements Link<T> {
040
041  public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
042  private static final Logger LOG = Logger.getLogger(NettyLink.class.getName());
043  private final Channel channel;
044  private final Encoder<? super T> encoder;
045  private final LinkListener<? super T> listener;
046
047  /**
048   * Constructs a link.
049   *
050   * @param channel the channel
051   * @param encoder the encoder
052   */
053  public NettyLink(final Channel channel, final Encoder<? super T> encoder) {
054    this(channel, encoder, null);
055  }
056
057  /**
058   * Constructs a link.
059   *
060   * @param channel  the channel
061   * @param encoder  the encoder
062   * @param listener the link listener
063   */
064  public NettyLink(final Channel channel,
065                   final Encoder<? super T> encoder, final LinkListener<? super T> listener) {
066    this.channel = channel;
067    this.encoder = encoder;
068    this.listener = listener;
069  }
070
071
072  /**
073   * Writes the message to this link.
074   *
075   * @param message the message
076   */
077  @Override
078  public void write(final T message) {
079    LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message});
080    final byte[] allData = encoder.encode(message);
081    // byte[] -> ByteBuf
082    if (listener !=  null) {
083      channel.writeAndFlush(Unpooled.wrappedBuffer(allData))
084          .addListener(new NettyChannelFutureListener<>(message, listener));
085    } else {
086      channel.writeAndFlush(Unpooled.wrappedBuffer(allData));
087    }
088  }
089
090  /**
091   * Gets a local address of the link.
092   *
093   * @return a local socket address
094   */
095  @Override
096  public SocketAddress getLocalAddress() {
097    return channel.localAddress();
098  }
099
100  /**
101   * Gets a remote address of the link.
102   *
103   * @return a remote socket address
104   */
105  @Override
106  public SocketAddress getRemoteAddress() {
107    return channel.remoteAddress();
108  }
109
110  @Override
111  public String toString() {
112    return "localAddr: " + getLocalAddress() + " remoteAddr: " + getRemoteAddress();
113  }
114}
115
116class NettyChannelFutureListener<T> implements ChannelFutureListener {
117
118  private final T message;
119  private LinkListener<T> listener;
120
121  NettyChannelFutureListener(final T message, final LinkListener<T> listener) {
122    this.message = message;
123    this.listener = listener;
124  }
125
126  @Override
127  public void operationComplete(final ChannelFuture channelFuture) throws Exception {
128    if (channelFuture.isSuccess()) {
129      listener.onSuccess(message);
130    } else {
131      listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message);
132    }
133  }
134}