001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.EStage; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.impl.StageManager; 025import org.apache.reef.wake.remote.*; 026import org.apache.reef.wake.remote.address.LocalAddressProvider; 027import org.apache.reef.wake.remote.ports.TcpPortProvider; 028import org.apache.reef.wake.remote.transport.Transport; 029import org.apache.reef.wake.remote.transport.TransportFactory; 030import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 031 032import javax.inject.Inject; 033import java.net.InetSocketAddress; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.atomic.AtomicInteger; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042/** 043 * Default remote manager implementation. 044 */ 045public final class DefaultRemoteManagerImplementation implements RemoteManager { 046 047 private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName()); 048 049 private static final AtomicInteger COUNTER = new AtomicInteger(0); 050 051 /** 052 * The timeout used for the execute running in close(). 053 */ 054 private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms 055 private final AtomicBoolean closed = new AtomicBoolean(false); 056 private final String name; 057 private final Transport transport; 058 private final RemoteSenderStage reSendStage; 059 private final EStage<TransportEvent> reRecvStage; 060 private final HandlerContainer handlerContainer; 061 private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator(); 062 private RemoteIdentifier myIdentifier; 063 /** 064 * Indicates a hostname that isn't set or known. 065 */ 066 public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME; 067 068 @Inject 069 private <T> DefaultRemoteManagerImplementation( 070 @Parameter(RemoteConfiguration.ManagerName.class) final String name, 071 @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress, 072 @Parameter(RemoteConfiguration.Port.class) final int listeningPort, 073 @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec, 074 @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler, 075 @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee, 076 @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, 077 @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout, 078 final LocalAddressProvider localAddressProvider, 079 final TransportFactory tpFactory, 080 final TcpPortProvider tcpPortProvider) { 081 082 this.name = name; 083 this.handlerContainer = new HandlerContainer<>(name, codec); 084 085 this.reRecvStage = orderingGuarantee ? 086 new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) : 087 new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); 088 089 this.transport = tpFactory.newInstance( 090 hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, 091 tcpPortProvider); 092 093 this.handlerContainer.setTransport(this.transport); 094 095 this.myIdentifier = new SocketRemoteIdentifier( 096 (InetSocketAddress) this.transport.getLocalAddress()); 097 098 this.reSendStage = new RemoteSenderStage(codec, this.transport, 10); 099 100 StageManager.instance().register(this); 101 LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}. " + 102 "Binding address provided by {5}", 103 new Object[]{this.name, this.myIdentifier, COUNTER.incrementAndGet(), 104 this.transport.getLocalAddress().toString(), 105 this.transport.getListeningPort(), localAddressProvider} 106 ); 107 } 108 109 110 /** 111 * Returns a proxy event handler for a remote identifier and a message type. 112 */ 113 @Override 114 public <T> EventHandler<T> getHandler( 115 final RemoteIdentifier destinationIdentifier, final Class<? extends T> messageType) { 116 117 if (LOG.isLoggable(Level.FINE)) { 118 LOG.log(Level.FINE, "RemoteManager: {0} destinationIdentifier: {1} messageType: {2}", 119 new Object[]{this.name, destinationIdentifier, messageType.getName()}); 120 } 121 122 return new ProxyEventHandler<>(this.myIdentifier, destinationIdentifier, 123 "default", this.reSendStage.<T>getHandler(), this.seqGen); 124 } 125 126 /** 127 * Registers an event handler for a remote identifier and a message type and. 128 * returns a subscription 129 */ 130 @Override 131 public <T, U extends T> AutoCloseable registerHandler( 132 final RemoteIdentifier sourceIdentifier, 133 final Class<U> messageType, final EventHandler<T> theHandler) { 134 if (LOG.isLoggable(Level.FINE)) { 135 LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2} handler: {3}", 136 new Object[]{this.name, sourceIdentifier, messageType.getName(), 137 theHandler.getClass().getName()}); 138 } 139 return this.handlerContainer.registerHandler(sourceIdentifier, messageType, theHandler); 140 } 141 142 /** 143 * Registers an event handler for a message type and returns a subscription. 144 */ 145 @Override 146 public <T, U extends T> AutoCloseable registerHandler( 147 final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) { 148 if (LOG.isLoggable(Level.FINE)) { 149 LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}", 150 new Object[]{this.name, messageType.getName(), theHandler.getClass().getName()}); 151 } 152 return this.handlerContainer.registerHandler(messageType, theHandler); 153 } 154 155 /** 156 * Registers an exception handler and returns a subscription. 157 */ 158 @Override 159 public AutoCloseable registerErrorHandler(final EventHandler<Exception> theHandler) { 160 if (LOG.isLoggable(Level.FINE)) { 161 LOG.log(Level.FINE, "RemoteManager: {0} handler: {1}", 162 new Object[]{this.name, theHandler.getClass().getName()}); 163 } 164 return this.handlerContainer.registerErrorHandler(theHandler); 165 } 166 167 /** 168 * Returns my identifier. 169 */ 170 @Override 171 public RemoteIdentifier getMyIdentifier() { 172 return this.myIdentifier; 173 } 174 175 @Override 176 public void close() { 177 if (closed.compareAndSet(false, true)) { 178 179 LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}", 180 new Object[]{this.name, this.myIdentifier}); 181 182 final Runnable closeRunnable = new Runnable() { 183 @Override 184 public void run() { 185 try { 186 LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier); 187 reSendStage.close(); 188 LOG.log(Level.FINE, "Closed the remote sender stage"); 189 } catch (final Exception e) { 190 LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e); 191 } 192 193 try { 194 LOG.log(Level.FINE, "Closing transport {0}", myIdentifier); 195 transport.close(); 196 LOG.log(Level.FINE, "Closed the transport"); 197 } catch (final Exception e) { 198 LOG.log(Level.SEVERE, "Unable to close the transport.", e); 199 } 200 201 try { 202 LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier); 203 reRecvStage.close(); 204 LOG.log(Level.FINE, "Closed the remote receiver stage"); 205 } catch (final Exception e) { 206 LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e); 207 } 208 } 209 210 }; 211 212 final ExecutorService closeExecutor = Executors.newSingleThreadExecutor(); 213 closeExecutor.submit(closeRunnable); 214 closeExecutor.shutdown(); 215 if (!closeExecutor.isShutdown()) { 216 LOG.log(Level.SEVERE, "close executor did not shutdown properly."); 217 } 218 219 final long endTime = System.currentTimeMillis() + CLOSE_EXECUTOR_TIMEOUT; 220 while (!closeExecutor.isTerminated()) { 221 try { 222 final long waitTime = endTime - System.currentTimeMillis(); 223 closeExecutor.awaitTermination(waitTime, TimeUnit.MILLISECONDS); 224 } catch (final InterruptedException e) { 225 LOG.log(Level.FINE, "Interrupted", e); 226 } 227 } 228 229 if (closeExecutor.isTerminated()) { 230 LOG.log(Level.FINE, "Close executor terminated properly."); 231 } else { 232 LOG.log(Level.SEVERE, "Close executor did not terminate properly."); 233 } 234 } 235 } 236}