001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.EStage; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.impl.StageManager; 025import org.apache.reef.wake.remote.*; 026import org.apache.reef.wake.remote.address.LocalAddressProvider; 027import org.apache.reef.wake.remote.ports.TcpPortProvider; 028import org.apache.reef.wake.remote.transport.Transport; 029import org.apache.reef.wake.remote.transport.TransportFactory; 030import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 031 032import javax.inject.Inject; 033import java.net.InetSocketAddress; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.atomic.AtomicInteger; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042/** 043 * Default remote manager implementation. 044 */ 045public final class DefaultRemoteManagerImplementation implements RemoteManager { 046 047 private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName()); 048 049 private static final AtomicInteger COUNTER = new AtomicInteger(0); 050 051 /** 052 * The timeout used for the execute running in close(). 053 */ 054 private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms 055 private final AtomicBoolean closed = new AtomicBoolean(false); 056 private final String name; 057 private final Transport transport; 058 private final RemoteSenderStage reSendStage; 059 private final EStage<TransportEvent> reRecvStage; 060 private final HandlerContainer handlerContainer; 061 private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator(); 062 private RemoteIdentifier myIdentifier; 063 /** 064 * Indicates a hostname that isn't set or known. 065 */ 066 public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME; 067 068 @Inject 069 private <T> DefaultRemoteManagerImplementation( 070 @Parameter(RemoteConfiguration.ManagerName.class) final String name, 071 @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress, 072 @Parameter(RemoteConfiguration.Port.class) final int listeningPort, 073 @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec, 074 @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler, 075 @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee, 076 @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, 077 @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout, 078 final LocalAddressProvider localAddressProvider, 079 final TransportFactory tpFactory, 080 final TcpPortProvider tcpPortProvider) { 081 082 this.name = name; 083 this.handlerContainer = new HandlerContainer<>(name, codec); 084 085 this.reRecvStage = orderingGuarantee ? 086 new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) : 087 new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); 088 089 this.transport = tpFactory.newInstance( 090 hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, 091 tcpPortProvider); 092 093 this.handlerContainer.setTransport(this.transport); 094 095 this.myIdentifier = new SocketRemoteIdentifier( 096 (InetSocketAddress) this.transport.getLocalAddress()); 097 098 this.reSendStage = new RemoteSenderStage(codec, this.transport, 10); 099 100 StageManager.instance().register(this); 101 LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}. " + 102 "Binding address provided by {5}", 103 new Object[]{this.name, this.myIdentifier, COUNTER.incrementAndGet(), 104 this.transport.getLocalAddress().toString(), 105 this.transport.getListeningPort(), localAddressProvider} 106 ); 107 } 108 109 110 /** 111 * Returns a proxy event handler for a remote identifier and a message type. 112 */ 113 @Override 114 public <T> EventHandler<T> getHandler( 115 final RemoteIdentifier destinationIdentifier, final Class<? extends T> messageType) { 116 117 if (LOG.isLoggable(Level.FINE)) { 118 LOG.log(Level.FINE, "RemoteManager: {0} destinationIdentifier: {1} messageType: {2}", 119 new Object[]{this.name, destinationIdentifier, messageType.getName()}); 120 } 121 122 return new ProxyEventHandler<>(this.myIdentifier, destinationIdentifier, 123 "default", this.reSendStage.<T>getHandler(), this.seqGen); 124 } 125 126 /** 127 * Registers an event handler for a remote identifier and a message type and. 128 * returns a subscription 129 */ 130 @Override 131 public <T, U extends T> AutoCloseable registerHandler( 132 final RemoteIdentifier sourceIdentifier, 133 final Class<U> messageType, final EventHandler<T> theHandler) { 134 if (LOG.isLoggable(Level.FINE)) { 135 LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2} handler: {3}", 136 new Object[]{this.name, sourceIdentifier, messageType.getName(), 137 theHandler.getClass().getName()}); 138 } 139 return this.handlerContainer.registerHandler(sourceIdentifier, messageType, theHandler); 140 } 141 142 /** 143 * Registers an event handler for a message type and returns a subscription. 144 */ 145 @Override 146 public <T, U extends T> AutoCloseable registerHandler( 147 final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) { 148 if (LOG.isLoggable(Level.FINE)) { 149 LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}", 150 new Object[]{this.name, messageType.getName(), theHandler.getClass().getName()}); 151 } 152 return this.handlerContainer.registerHandler(messageType, theHandler); 153 } 154 155 /** 156 * Returns my identifier. 157 */ 158 @Override 159 public RemoteIdentifier getMyIdentifier() { 160 return this.myIdentifier; 161 } 162 163 @Override 164 public void close() { 165 if (closed.compareAndSet(false, true)) { 166 167 LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}", 168 new Object[]{this.name, this.myIdentifier}); 169 170 final Runnable closeRunnable = new Runnable() { 171 @Override 172 public void run() { 173 try { 174 LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier); 175 reSendStage.close(); 176 LOG.log(Level.FINE, "Closed the remote sender stage"); 177 } catch (final Exception e) { 178 LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e); 179 } 180 181 try { 182 LOG.log(Level.FINE, "Closing transport {0}", myIdentifier); 183 transport.close(); 184 LOG.log(Level.FINE, "Closed the transport"); 185 } catch (final Exception e) { 186 LOG.log(Level.SEVERE, "Unable to close the transport.", e); 187 } 188 189 try { 190 LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier); 191 reRecvStage.close(); 192 LOG.log(Level.FINE, "Closed the remote receiver stage"); 193 } catch (final Exception e) { 194 LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e); 195 } 196 } 197 198 }; 199 200 final ExecutorService closeExecutor = Executors.newSingleThreadExecutor(); 201 closeExecutor.submit(closeRunnable); 202 closeExecutor.shutdown(); 203 if (!closeExecutor.isShutdown()) { 204 LOG.log(Level.SEVERE, "close executor did not shutdown properly."); 205 } 206 207 final long endTime = System.currentTimeMillis() + CLOSE_EXECUTOR_TIMEOUT; 208 while (!closeExecutor.isTerminated()) { 209 try { 210 final long waitTime = endTime - System.currentTimeMillis(); 211 closeExecutor.awaitTermination(waitTime, TimeUnit.MILLISECONDS); 212 } catch (final InterruptedException e) { 213 LOG.log(Level.FINE, "Interrupted", e); 214 } 215 } 216 217 if (closeExecutor.isTerminated()) { 218 LOG.log(Level.FINE, "Close executor terminated properly."); 219 } else { 220 LOG.log(Level.SEVERE, "Close executor did not terminate properly."); 221 } 222 } 223 } 224}