001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.transport.netty; 020 021import io.netty.bootstrap.Bootstrap; 022import io.netty.bootstrap.ServerBootstrap; 023import io.netty.channel.Channel; 024import io.netty.channel.ChannelFuture; 025import io.netty.channel.ChannelOption; 026import io.netty.channel.EventLoopGroup; 027import io.netty.channel.group.ChannelGroup; 028import io.netty.channel.group.DefaultChannelGroup; 029import io.netty.channel.nio.NioEventLoopGroup; 030import io.netty.channel.socket.nio.NioServerSocketChannel; 031import io.netty.channel.socket.nio.NioSocketChannel; 032import io.netty.util.concurrent.GlobalEventExecutor; 033import org.apache.reef.wake.EStage; 034import org.apache.reef.wake.EventHandler; 035import org.apache.reef.wake.impl.DefaultThreadFactory; 036import org.apache.reef.wake.remote.Encoder; 037import org.apache.reef.wake.remote.exception.RemoteRuntimeException; 038import org.apache.reef.wake.remote.impl.TransportEvent; 039import org.apache.reef.wake.remote.transport.Link; 040import org.apache.reef.wake.remote.transport.LinkListener; 041import org.apache.reef.wake.remote.transport.Transport; 042import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException; 043 044import java.io.IOException; 045import java.net.BindException; 046import java.net.InetSocketAddress; 047import java.net.SocketAddress; 048import java.util.Random; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.ConcurrentMap; 051import java.util.concurrent.atomic.AtomicInteger; 052import java.util.logging.Level; 053import java.util.logging.Logger; 054 055/** 056 * Messaging transport implementation with Netty 057 */ 058public class NettyMessagingTransport implements Transport { 059 060 private static final String CLASS_NAME = NettyMessagingTransport.class.getName(); 061 private static final Logger LOG = Logger.getLogger(CLASS_NAME); 062 063 private static final int SERVER_BOSS_NUM_THREADS = 3; 064 private static final int SERVER_WORKER_NUM_THREADS = 20; 065 private static final int CLIENT_WORKER_NUM_THREADS = 10; 066 private static final int PORT_START = 10000; 067 private static final int PORT_RANGE = 10000; 068 private static final Random randPort = new Random(); 069 070 private final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap = new ConcurrentHashMap<>(); 071 072 private final EventLoopGroup clientWorkerGroup; 073 private final EventLoopGroup serverBossGroup; 074 private final EventLoopGroup serverWorkerGroup; 075 076 private final Bootstrap clientBootstrap; 077 private final ServerBootstrap serverBootstrap; 078 private final Channel acceptor; 079 080 private final ChannelGroup clientChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 081 private final ChannelGroup serverChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 082 083 private final int serverPort; 084 private final SocketAddress localAddress; 085 086 private final NettyClientEventListener clientEventListener; 087 private final NettyServerEventListener serverEventListener; 088 089 private final int numberOfTries; 090 private final int retryTimeout; 091 092 /** 093 * Constructs a messaging transport 094 * 095 * @param hostAddress the server host address 096 * @param port the server listening port; when it is 0, randomly assign a port number 097 * @param clientStage the client-side stage that handles transport events 098 * @param serverStage the server-side stage that handles transport events 099 * @param numberOfTries the number of tries of connection 100 * @param retryTimeout the timeout of reconnection 101 */ 102 public NettyMessagingTransport(final String hostAddress, int port, 103 final EStage<TransportEvent> clientStage, 104 final EStage<TransportEvent> serverStage, 105 final int numberOfTries, 106 final int retryTimeout) { 107 108 if (port < 0) { 109 throw new RemoteRuntimeException("Invalid server port: " + port); 110 } 111 112 this.numberOfTries = numberOfTries; 113 this.retryTimeout = retryTimeout; 114 this.clientEventListener = new NettyClientEventListener(this.addrToLinkRefMap, clientStage); 115 this.serverEventListener = new NettyServerEventListener(this.addrToLinkRefMap, serverStage); 116 117 this.serverBossGroup = new NioEventLoopGroup(SERVER_BOSS_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ServerBoss")); 118 this.serverWorkerGroup = new NioEventLoopGroup(SERVER_WORKER_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ServerWorker")); 119 this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ClientWorker")); 120 121 this.clientBootstrap = new Bootstrap(); 122 this.clientBootstrap.group(this.clientWorkerGroup) 123 .channel(NioSocketChannel.class) 124 .handler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("client", 125 this.clientChannelGroup, this.clientEventListener))) 126 .option(ChannelOption.SO_REUSEADDR, true) 127 .option(ChannelOption.SO_KEEPALIVE, true); 128 129 this.serverBootstrap = new ServerBootstrap(); 130 this.serverBootstrap.group(this.serverBossGroup, this.serverWorkerGroup) 131 .channel(NioServerSocketChannel.class) 132 .childHandler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("server", 133 this.serverChannelGroup, this.serverEventListener))) 134 .option(ChannelOption.SO_BACKLOG, 128) 135 .option(ChannelOption.SO_REUSEADDR, true) 136 .childOption(ChannelOption.SO_KEEPALIVE, true); 137 138 LOG.log(Level.FINE, "Binding to {0}", port); 139 140 Channel acceptor = null; 141 try { 142 if (port > 0) { 143 acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel(); 144 } else { 145 while (acceptor == null) { 146 port = randPort.nextInt(PORT_START) + PORT_RANGE; 147 LOG.log(Level.FINEST, "Try port {0}", port); 148 try { 149 acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel(); 150 } catch (final Exception ex) { 151 if (ex instanceof BindException) { 152 LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port); 153 } else { 154 throw ex; 155 } 156 } 157 } 158 } 159 } catch (final Exception ex) { 160 final RuntimeException transportException = 161 new TransportRuntimeException("Cannot bind to port " + port); 162 LOG.log(Level.SEVERE, "Cannot bind to port " + port, ex); 163 164 this.clientWorkerGroup.shutdownGracefully(); 165 this.serverBossGroup.shutdownGracefully(); 166 this.serverWorkerGroup.shutdownGracefully(); 167 throw transportException; 168 } 169 170 this.acceptor = acceptor; 171 this.serverPort = port; 172 this.localAddress = new InetSocketAddress(hostAddress, this.serverPort); 173 174 LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress); 175 } 176 177 /** 178 * Closes all channels and releases all resources 179 */ 180 @Override 181 public void close() throws Exception { 182 183 LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress); 184 185 this.clientChannelGroup.close().awaitUninterruptibly(); 186 this.serverChannelGroup.close().awaitUninterruptibly(); 187 this.acceptor.close().sync(); 188 this.clientWorkerGroup.shutdownGracefully(); 189 this.serverBossGroup.shutdownGracefully(); 190 this.serverWorkerGroup.shutdownGracefully(); 191 192 LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress); 193 } 194 195 /** 196 * Returns a link for the remote address if cached; otherwise opens, caches and returns 197 * When it opens a link for the remote address, only one attempt for the address is made at a given time 198 * 199 * @param remoteAddr the remote socket address 200 * @param encoder the encoder 201 * @param listener the link listener 202 * @return a link associated with the address 203 */ 204 @Override 205 public <T> Link<T> open(final SocketAddress remoteAddr, final Encoder<? super T> encoder, 206 final LinkListener<? super T> listener) throws IOException { 207 208 Link<T> link = null; 209 210 for (int i = 0; i < this.numberOfTries; ++i) { 211 LinkReference linkRef = this.addrToLinkRefMap.get(remoteAddr); 212 213 if (linkRef != null) { 214 link = (Link<T>) linkRef.getLink(); 215 if (LOG.isLoggable(Level.FINE)) { 216 LOG.log(Level.FINE, "Link {0} for {1} found", new Object[]{link, remoteAddr}); 217 } 218 if (link != null) { 219 return link; 220 } 221 } 222 223 LOG.log(Level.FINE, "No cached link for {0} thread {1}", 224 new Object[]{remoteAddr, Thread.currentThread()}); 225 226 // no linkRef 227 final LinkReference newLinkRef = new LinkReference(); 228 final LinkReference prior = this.addrToLinkRefMap.putIfAbsent(remoteAddr, newLinkRef); 229 final AtomicInteger flag = prior != null ? 230 prior.getConnectInProgress() : newLinkRef.getConnectInProgress(); 231 232 synchronized (flag) { 233 if (!flag.compareAndSet(0, 1)) { 234 while (flag.get() == 1) { 235 try { 236 flag.wait(); 237 } catch (final InterruptedException ex) { 238 LOG.log(Level.WARNING, "Wait interrupted", ex); 239 } 240 } 241 } 242 } 243 244 linkRef = this.addrToLinkRefMap.get(remoteAddr); 245 link = (Link<T>) linkRef.getLink(); 246 247 if (link != null) { 248 return link; 249 } 250 251 ChannelFuture connectFuture = null; 252 try { 253 connectFuture = this.clientBootstrap.connect(remoteAddr); 254 connectFuture.syncUninterruptibly(); 255 256 link = new NettyLink<>(connectFuture.channel(), encoder, listener); 257 linkRef.setLink(link); 258 259 synchronized (flag) { 260 flag.compareAndSet(1, 2); 261 flag.notifyAll(); 262 } 263 break; 264 } catch (final Exception e) { 265 if (e.getClass().getSimpleName().compareTo("ConnectException") == 0) { 266 LOG.log(Level.WARNING, "Connection refused. Retry {0} of {1}", 267 new Object[]{i + 1, this.numberOfTries}); 268 synchronized (flag) { 269 flag.compareAndSet(1, 0); 270 flag.notifyAll(); 271 } 272 273 if (i < this.numberOfTries) { 274 try { 275 Thread.sleep(retryTimeout); 276 } catch (final InterruptedException interrupt) { 277 LOG.log(Level.WARNING, "Thread {0} interrupted while sleeping", Thread.currentThread()); 278 } 279 } 280 } else { 281 throw e; 282 } 283 } 284 } 285 return link; 286 } 287 288 /** 289 * Returns a link for the remote address if already cached; otherwise, returns null 290 * 291 * @param remoteAddr the remote address 292 * @return a link if already cached; otherwise, null 293 */ 294 public <T> Link<T> get(final SocketAddress remoteAddr) { 295 final LinkReference linkRef = this.addrToLinkRefMap.get(remoteAddr); 296 return linkRef != null ? (Link<T>) linkRef.getLink() : null; 297 } 298 299 /** 300 * Gets a server local socket address of this transport 301 * 302 * @return a server local socket address 303 */ 304 @Override 305 public SocketAddress getLocalAddress() { 306 return this.localAddress; 307 } 308 309 /** 310 * Gets a server listening port of this transport 311 * 312 * @return a listening port number 313 */ 314 @Override 315 public int getListeningPort() { 316 return this.serverPort; 317 } 318 319 /** 320 * Registers the exception event handler 321 * 322 * @param handler the exception event handler 323 */ 324 @Override 325 public void registerErrorHandler(final EventHandler<Exception> handler) { 326 this.clientEventListener.registerErrorHandler(handler); 327 this.serverEventListener.registerErrorHandler(handler); 328 } 329}