This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.transport.netty;
020
021import io.netty.bootstrap.Bootstrap;
022import io.netty.bootstrap.ServerBootstrap;
023import io.netty.channel.Channel;
024import io.netty.channel.ChannelFuture;
025import io.netty.channel.ChannelOption;
026import io.netty.channel.EventLoopGroup;
027import io.netty.channel.group.ChannelGroup;
028import io.netty.channel.group.DefaultChannelGroup;
029import io.netty.channel.nio.NioEventLoopGroup;
030import io.netty.channel.socket.nio.NioServerSocketChannel;
031import io.netty.channel.socket.nio.NioSocketChannel;
032import io.netty.util.concurrent.GlobalEventExecutor;
033import org.apache.reef.wake.EStage;
034import org.apache.reef.wake.EventHandler;
035import org.apache.reef.wake.impl.DefaultThreadFactory;
036import org.apache.reef.wake.remote.Encoder;
037import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
038import org.apache.reef.wake.remote.impl.TransportEvent;
039import org.apache.reef.wake.remote.transport.Link;
040import org.apache.reef.wake.remote.transport.LinkListener;
041import org.apache.reef.wake.remote.transport.Transport;
042import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException;
043
044import java.io.IOException;
045import java.net.BindException;
046import java.net.InetSocketAddress;
047import java.net.SocketAddress;
048import java.util.Random;
049import java.util.concurrent.ConcurrentHashMap;
050import java.util.concurrent.ConcurrentMap;
051import java.util.concurrent.atomic.AtomicInteger;
052import java.util.logging.Level;
053import java.util.logging.Logger;
054
055/**
056 * Messaging transport implementation with Netty
057 */
058public class NettyMessagingTransport implements Transport {
059
060  private static final String CLASS_NAME = NettyMessagingTransport.class.getName();
061  private static final Logger LOG = Logger.getLogger(CLASS_NAME);
062
063  private static final int SERVER_BOSS_NUM_THREADS = 3;
064  private static final int SERVER_WORKER_NUM_THREADS = 20;
065  private static final int CLIENT_WORKER_NUM_THREADS = 10;
066  private static final int PORT_START = 10000;
067  private static final int PORT_RANGE = 10000;
068  private static final Random randPort = new Random();
069
070  private final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap = new ConcurrentHashMap<>();
071
072  private final EventLoopGroup clientWorkerGroup;
073  private final EventLoopGroup serverBossGroup;
074  private final EventLoopGroup serverWorkerGroup;
075
076  private final Bootstrap clientBootstrap;
077  private final ServerBootstrap serverBootstrap;
078  private final Channel acceptor;
079
080  private final ChannelGroup clientChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
081  private final ChannelGroup serverChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
082
083  private final int serverPort;
084  private final SocketAddress localAddress;
085
086  private final NettyClientEventListener clientEventListener;
087  private final NettyServerEventListener serverEventListener;
088
089  private final int numberOfTries;
090  private final int retryTimeout;
091
092  /**
093   * Constructs a messaging transport
094   *
095   * @param hostAddress   the server host address
096   * @param port          the server listening port; when it is 0, randomly assign a port number
097   * @param clientStage   the client-side stage that handles transport events
098   * @param serverStage   the server-side stage that handles transport events
099   * @param numberOfTries the number of tries of connection
100   * @param retryTimeout  the timeout of reconnection
101   */
102  public NettyMessagingTransport(final String hostAddress, int port,
103                                 final EStage<TransportEvent> clientStage,
104                                 final EStage<TransportEvent> serverStage,
105                                 final int numberOfTries,
106                                 final int retryTimeout) {
107
108    if (port < 0) {
109      throw new RemoteRuntimeException("Invalid server port: " + port);
110    }
111
112    this.numberOfTries = numberOfTries;
113    this.retryTimeout = retryTimeout;
114    this.clientEventListener = new NettyClientEventListener(this.addrToLinkRefMap, clientStage);
115    this.serverEventListener = new NettyServerEventListener(this.addrToLinkRefMap, serverStage);
116
117    this.serverBossGroup = new NioEventLoopGroup(SERVER_BOSS_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ServerBoss"));
118    this.serverWorkerGroup = new NioEventLoopGroup(SERVER_WORKER_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ServerWorker"));
119    this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ClientWorker"));
120
121    this.clientBootstrap = new Bootstrap();
122    this.clientBootstrap.group(this.clientWorkerGroup)
123        .channel(NioSocketChannel.class)
124        .handler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("client",
125            this.clientChannelGroup, this.clientEventListener)))
126        .option(ChannelOption.SO_REUSEADDR, true)
127        .option(ChannelOption.SO_KEEPALIVE, true);
128
129    this.serverBootstrap = new ServerBootstrap();
130    this.serverBootstrap.group(this.serverBossGroup, this.serverWorkerGroup)
131        .channel(NioServerSocketChannel.class)
132        .childHandler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("server",
133            this.serverChannelGroup, this.serverEventListener)))
134        .option(ChannelOption.SO_BACKLOG, 128)
135        .option(ChannelOption.SO_REUSEADDR, true)
136        .childOption(ChannelOption.SO_KEEPALIVE, true);
137
138    LOG.log(Level.FINE, "Binding to {0}", port);
139
140    Channel acceptor = null;
141    try {
142      if (port > 0) {
143        acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel();
144      } else {
145        while (acceptor == null) {
146          port = randPort.nextInt(PORT_START) + PORT_RANGE;
147          LOG.log(Level.FINEST, "Try port {0}", port);
148          try {
149            acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel();
150          } catch (final Exception ex) {
151            if (ex instanceof BindException) {
152              LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port);
153            } else {
154              throw ex;
155            }
156          }
157        }
158      }
159    } catch (final Exception ex) {
160      final RuntimeException transportException =
161          new TransportRuntimeException("Cannot bind to port " + port);
162      LOG.log(Level.SEVERE, "Cannot bind to port " + port, ex);
163
164      this.clientWorkerGroup.shutdownGracefully();
165      this.serverBossGroup.shutdownGracefully();
166      this.serverWorkerGroup.shutdownGracefully();
167      throw transportException;
168    }
169
170    this.acceptor = acceptor;
171    this.serverPort = port;
172    this.localAddress = new InetSocketAddress(hostAddress, this.serverPort);
173
174    LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress);
175  }
176
177  /**
178   * Closes all channels and releases all resources
179   */
180  @Override
181  public void close() throws Exception {
182
183    LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress);
184
185    this.clientChannelGroup.close().awaitUninterruptibly();
186    this.serverChannelGroup.close().awaitUninterruptibly();
187    this.acceptor.close().sync();
188    this.clientWorkerGroup.shutdownGracefully();
189    this.serverBossGroup.shutdownGracefully();
190    this.serverWorkerGroup.shutdownGracefully();
191
192    LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress);
193  }
194
195  /**
196   * Returns a link for the remote address if cached; otherwise opens, caches and returns
197   * When it opens a link for the remote address, only one attempt for the address is made at a given time
198   *
199   * @param remoteAddr the remote socket address
200   * @param encoder    the encoder
201   * @param listener   the link listener
202   * @return a link associated with the address
203   */
204  @Override
205  public <T> Link<T> open(final SocketAddress remoteAddr, final Encoder<? super T> encoder,
206                          final LinkListener<? super T> listener) throws IOException {
207
208    Link<T> link = null;
209
210    for (int i = 0; i < this.numberOfTries; ++i) {
211      LinkReference linkRef = this.addrToLinkRefMap.get(remoteAddr);
212
213      if (linkRef != null) {
214        link = (Link<T>) linkRef.getLink();
215        if (LOG.isLoggable(Level.FINE)) {
216          LOG.log(Level.FINE, "Link {0} for {1} found", new Object[]{link, remoteAddr});
217        }
218        if (link != null) {
219          return link;
220        }
221      }
222
223      LOG.log(Level.FINE, "No cached link for {0} thread {1}",
224          new Object[]{remoteAddr, Thread.currentThread()});
225
226      // no linkRef
227      final LinkReference newLinkRef = new LinkReference();
228      final LinkReference prior = this.addrToLinkRefMap.putIfAbsent(remoteAddr, newLinkRef);
229      final AtomicInteger flag = prior != null ?
230          prior.getConnectInProgress() : newLinkRef.getConnectInProgress();
231
232      synchronized (flag) {
233        if (!flag.compareAndSet(0, 1)) {
234          while (flag.get() == 1) {
235            try {
236              flag.wait();
237            } catch (final InterruptedException ex) {
238              LOG.log(Level.WARNING, "Wait interrupted", ex);
239            }
240          }
241        }
242      }
243
244      linkRef = this.addrToLinkRefMap.get(remoteAddr);
245      link = (Link<T>) linkRef.getLink();
246
247      if (link != null) {
248        return link;
249      }
250
251      ChannelFuture connectFuture = null;
252      try {
253        connectFuture = this.clientBootstrap.connect(remoteAddr);
254        connectFuture.syncUninterruptibly();
255
256        link = new NettyLink<>(connectFuture.channel(), encoder, listener);
257        linkRef.setLink(link);
258
259        synchronized (flag) {
260          flag.compareAndSet(1, 2);
261          flag.notifyAll();
262        }
263        break;
264      } catch (final Exception e) {
265        if (e.getClass().getSimpleName().compareTo("ConnectException") == 0) {
266          LOG.log(Level.WARNING, "Connection refused. Retry {0} of {1}",
267              new Object[]{i + 1, this.numberOfTries});
268          synchronized (flag) {
269            flag.compareAndSet(1, 0);
270            flag.notifyAll();
271          }
272
273          if (i < this.numberOfTries) {
274            try {
275              Thread.sleep(retryTimeout);
276            } catch (final InterruptedException interrupt) {
277              LOG.log(Level.WARNING, "Thread {0} interrupted while sleeping", Thread.currentThread());
278            }
279          }
280        } else {
281          throw e;
282        }
283      }
284    }
285    return link;
286  }
287
288  /**
289   * Returns a link for the remote address if already cached; otherwise, returns null
290   *
291   * @param remoteAddr the remote address
292   * @return a link if already cached; otherwise, null
293   */
294  public <T> Link<T> get(final SocketAddress remoteAddr) {
295    final LinkReference linkRef = this.addrToLinkRefMap.get(remoteAddr);
296    return linkRef != null ? (Link<T>) linkRef.getLink() : null;
297  }
298
299  /**
300   * Gets a server local socket address of this transport
301   *
302   * @return a server local socket address
303   */
304  @Override
305  public SocketAddress getLocalAddress() {
306    return this.localAddress;
307  }
308
309  /**
310   * Gets a server listening port of this transport
311   *
312   * @return a listening port number
313   */
314  @Override
315  public int getListeningPort() {
316    return this.serverPort;
317  }
318
319  /**
320   * Registers the exception event handler
321   *
322   * @param handler the exception event handler
323   */
324  @Override
325  public void registerErrorHandler(final EventHandler<Exception> handler) {
326    this.clientEventListener.registerErrorHandler(handler);
327    this.serverEventListener.registerErrorHandler(handler);
328  }
329}