001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.EStage; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.impl.StageManager; 025import org.apache.reef.wake.remote.*; 026import org.apache.reef.wake.remote.address.LocalAddressProvider; 027import org.apache.reef.wake.remote.ports.TcpPortProvider; 028import org.apache.reef.wake.remote.transport.Transport; 029import org.apache.reef.wake.remote.transport.TransportFactory; 030import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 031 032import javax.inject.Inject; 033import java.net.InetSocketAddress; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.atomic.AtomicInteger; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042/** 043 * Default remote manager implementation. 044 */ 045public final class DefaultRemoteManagerImplementation implements RemoteManager { 046 047 private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName()); 048 049 private static final AtomicInteger COUNTER = new AtomicInteger(0); 050 051 /** 052 * The timeout used for the execute running in close(). 053 */ 054 private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms 055 056 /** 057 * Indicates a hostname that isn't set or known. 058 */ 059 public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME; 060 061 private final AtomicBoolean closed = new AtomicBoolean(false); 062 private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator(); 063 064 private final String name; 065 private final Transport transport; 066 private final RemoteSenderStage reSendStage; 067 private final EStage<TransportEvent> reRecvStage; 068 private final HandlerContainer handlerContainer; 069 070 private RemoteIdentifier myIdentifier; 071 072 @Inject 073 private <T> DefaultRemoteManagerImplementation( 074 @Parameter(RemoteConfiguration.ManagerName.class) final String name, 075 @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress, 076 @Parameter(RemoteConfiguration.Port.class) final int listeningPort, 077 @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec, 078 @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler, 079 @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee, 080 @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, 081 @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout, 082 final LocalAddressProvider localAddressProvider, 083 final TransportFactory tpFactory, 084 final TcpPortProvider tcpPortProvider) { 085 086 this.name = name; 087 this.handlerContainer = new HandlerContainer<>(name, codec); 088 089 this.reRecvStage = orderingGuarantee ? 090 new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) : 091 new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); 092 093 this.transport = tpFactory.newInstance(hostAddress, listeningPort, 094 this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider); 095 096 this.handlerContainer.setTransport(this.transport); 097 098 this.myIdentifier = new SocketRemoteIdentifier((InetSocketAddress)this.transport.getLocalAddress()); 099 100 this.reSendStage = new RemoteSenderStage(codec, this.transport, 10); 101 102 StageManager.instance().register(this); 103 104 final int counter = COUNTER.incrementAndGet(); 105 106 LOG.log(Level.FINEST, 107 "RemoteManager {0} instantiated id {1} counter {2} listening on {3} Binding address provided by {4}", 108 new Object[] {this.name, this.myIdentifier, counter, this.transport.getLocalAddress(), localAddressProvider}); 109 } 110 111 /** 112 * Returns a proxy event handler for a remote identifier and a message type. 113 */ 114 @Override 115 public <T> EventHandler<T> getHandler( 116 final RemoteIdentifier destinationIdentifier, final Class<? extends T> messageType) { 117 118 if (LOG.isLoggable(Level.FINE)) { 119 LOG.log(Level.FINE, "RemoteManager: {0} destinationIdentifier: {1} messageType: {2}", 120 new Object[] {this.name, destinationIdentifier, messageType.getName()}); 121 } 122 123 return new ProxyEventHandler<>(this.myIdentifier, destinationIdentifier, 124 "default", this.reSendStage.<T>getHandler(), this.seqGen); 125 } 126 127 /** 128 * Registers an event handler for a remote identifier and a message type and. 129 * returns a subscription 130 */ 131 @Override 132 public <T, U extends T> AutoCloseable registerHandler( 133 final RemoteIdentifier sourceIdentifier, 134 final Class<U> messageType, final EventHandler<T> theHandler) { 135 136 if (LOG.isLoggable(Level.FINE)) { 137 LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2} handler: {3}", 138 new Object[] {this.name, sourceIdentifier, messageType.getName(), theHandler.getClass().getName()}); 139 } 140 141 return this.handlerContainer.registerHandler(sourceIdentifier, messageType, theHandler); 142 } 143 144 /** 145 * Registers an event handler for a message type and returns a subscription. 146 */ 147 @Override 148 public <T, U extends T> AutoCloseable registerHandler( 149 final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) { 150 151 if (LOG.isLoggable(Level.FINE)) { 152 LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}", 153 new Object[] {this.name, messageType.getName(), theHandler.getClass().getName()}); 154 } 155 156 return this.handlerContainer.registerHandler(messageType, theHandler); 157 } 158 159 /** 160 * Returns my identifier. 161 */ 162 @Override 163 public RemoteIdentifier getMyIdentifier() { 164 return this.myIdentifier; 165 } 166 167 @Override 168 public void close() { 169 170 LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}", 171 new Object[] {this.name, this.myIdentifier}); 172 173 if (!this.closed.compareAndSet(false, true)) { 174 LOG.log(Level.FINE, "RemoteManager: {0} already closed", this.name); 175 return; 176 } 177 178 final Runnable closeRunnable = new Runnable() { 179 @Override 180 public void run() { 181 182 Thread.currentThread().setName(String.format("CLOSE:RemoteManager:%s:%s", name, myIdentifier)); 183 184 try { 185 LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier); 186 reSendStage.close(); 187 LOG.log(Level.FINE, "Closed the remote sender stage"); 188 } catch (final Exception e) { 189 LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e); 190 } 191 192 try { 193 LOG.log(Level.FINE, "Closing transport {0}", myIdentifier); 194 transport.close(); 195 LOG.log(Level.FINE, "Closed the transport"); 196 } catch (final Exception e) { 197 LOG.log(Level.SEVERE, "Unable to close the transport.", e); 198 } 199 200 try { 201 LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier); 202 reRecvStage.close(); 203 LOG.log(Level.FINE, "Closed the remote receiver stage"); 204 } catch (final Exception e) { 205 LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e); 206 } 207 } 208 }; 209 210 final ExecutorService closeExecutor = Executors.newSingleThreadExecutor(); 211 212 closeExecutor.submit(closeRunnable); 213 closeExecutor.shutdown(); 214 215 if (!closeExecutor.isShutdown()) { 216 LOG.log(Level.SEVERE, "close executor did not shutdown properly."); 217 } 218 219 final long endTime = System.currentTimeMillis() + CLOSE_EXECUTOR_TIMEOUT; 220 while (!closeExecutor.isTerminated()) { 221 try { 222 final long waitTime = endTime - System.currentTimeMillis(); 223 closeExecutor.awaitTermination(waitTime, TimeUnit.MILLISECONDS); 224 } catch (final InterruptedException e) { 225 LOG.log(Level.FINE, "Interrupted", e); 226 } 227 } 228 229 if (closeExecutor.isTerminated()) { 230 LOG.log(Level.FINE, "Close executor terminated properly."); 231 } else { 232 LOG.log(Level.SEVERE, "Close executor did not terminate properly."); 233 } 234 } 235 236 @Override 237 public String toString() { 238 return String.format("RemoteManager: { id:%s handler:%s }", this.myIdentifier, this.handlerContainer); 239 } 240}