001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.EStage; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.impl.StageManager; 025import org.apache.reef.wake.remote.*; 026import org.apache.reef.wake.remote.transport.Transport; 027import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 028 029import javax.inject.Inject; 030import java.net.InetSocketAddress; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.Executors; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039/** 040 * Default remote manager implementation 041 */ 042public class DefaultRemoteManagerImplementation implements RemoteManager { 043 044 private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName()); 045 046 private static final AtomicInteger counter = new AtomicInteger(0); 047 048 /** 049 * The timeout used for the execute running in close() 050 */ 051 private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms 052 private final AtomicBoolean closed = new AtomicBoolean(false); 053 private final String name; 054 private final Transport transport; 055 private final RemoteSenderStage reSendStage; 056 private final EStage<TransportEvent> reRecvStage; 057 private final HandlerContainer handlerContainer; 058 private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator(); 059 private RemoteIdentifier myIdentifier; 060 061 @Inject 062 public <T> DefaultRemoteManagerImplementation( 063 final @Parameter(RemoteConfiguration.ManagerName.class) String name, 064 final @Parameter(RemoteConfiguration.HostAddress.class) String hostAddress, 065 final @Parameter(RemoteConfiguration.Port.class) int listeningPort, 066 final @Parameter(RemoteConfiguration.MessageCodec.class) Codec<T> codec, 067 final @Parameter(RemoteConfiguration.ErrorHandler.class) EventHandler<Throwable> errorHandler, 068 final @Parameter(RemoteConfiguration.OrderingGuarantee.class) boolean orderingGuarantee, 069 final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries, 070 final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout) { 071 072 this.name = name; 073 this.handlerContainer = new HandlerContainer<>(name, codec); 074 075 this.reRecvStage = orderingGuarantee ? 076 new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) : 077 new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); 078 079 if ("##UNKNOWN##".equals(hostAddress)) { 080 this.transport = new NettyMessagingTransport( 081 NetUtils.getLocalAddress(), listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout); 082 } else { 083 this.transport = new NettyMessagingTransport( 084 hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout); 085 } 086 087 this.handlerContainer.setTransport(this.transport); 088 089 this.myIdentifier = new SocketRemoteIdentifier( 090 (InetSocketAddress) this.transport.getLocalAddress()); 091 092 this.reSendStage = new RemoteSenderStage(codec, this.transport, 10); 093 094 StageManager.instance().register(this); 095 096 LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}", 097 new Object[]{this.name, this.myIdentifier, counter.incrementAndGet(), 098 this.transport.getLocalAddress().toString(), 099 this.transport.getListeningPort()} 100 ); 101 } 102 103 /** 104 * Returns a proxy event handler for a remote identifier and a message type 105 */ 106 @Override 107 public <T> EventHandler<T> getHandler( 108 final RemoteIdentifier destinationIdentifier, final Class<? extends T> messageType) { 109 110 if (LOG.isLoggable(Level.FINE)) { 111 LOG.log(Level.FINE, "RemoteManager: {0} destinationIdentifier: {1} messageType: {2}", 112 new Object[]{this.name, destinationIdentifier, messageType.getName()}); 113 } 114 115 return new ProxyEventHandler<>(this.myIdentifier, destinationIdentifier, 116 "default", this.reSendStage.<T>getHandler(), this.seqGen); 117 } 118 119 /** 120 * Registers an event handler for a remote identifier and a message type and 121 * returns a subscription 122 */ 123 @Override 124 public <T, U extends T> AutoCloseable registerHandler( 125 final RemoteIdentifier sourceIdentifier, 126 final Class<U> messageType, final EventHandler<T> theHandler) { 127 if (LOG.isLoggable(Level.FINE)) { 128 LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2} handler: {3}", 129 new Object[]{this.name, sourceIdentifier, messageType.getName(), 130 theHandler.getClass().getName()}); 131 } 132 return this.handlerContainer.registerHandler(sourceIdentifier, messageType, theHandler); 133 } 134 135 /** 136 * Registers an event handler for a message type and returns a subscription 137 */ 138 @Override 139 public <T, U extends T> AutoCloseable registerHandler( 140 final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) { 141 if (LOG.isLoggable(Level.FINE)) { 142 LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}", 143 new Object[]{this.name, messageType.getName(), theHandler.getClass().getName()}); 144 } 145 return this.handlerContainer.registerHandler(messageType, theHandler); 146 } 147 148 /** 149 * Registers an exception handler and returns a subscription 150 */ 151 @Override 152 public AutoCloseable registerErrorHandler(final EventHandler<Exception> theHandler) { 153 if (LOG.isLoggable(Level.FINE)) { 154 LOG.log(Level.FINE, "RemoteManager: {0} handler: {1}", 155 new Object[]{this.name, theHandler.getClass().getName()}); 156 } 157 return this.handlerContainer.registerErrorHandler(theHandler); 158 } 159 160 /** 161 * Returns my identifier 162 */ 163 @Override 164 public RemoteIdentifier getMyIdentifier() { 165 return this.myIdentifier; 166 } 167 168 @Override 169 public void close() { 170 if (closed.compareAndSet(false, true)) { 171 172 LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}", 173 new Object[]{this.name, this.myIdentifier}); 174 175 final Runnable closeRunnable = new Runnable() { 176 @Override 177 public void run() { 178 try { 179 LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier); 180 reSendStage.close(); 181 LOG.log(Level.FINE, "Closed the remote sender stage"); 182 } catch (final Exception e) { 183 LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e); 184 } 185 186 try { 187 LOG.log(Level.FINE, "Closing transport {0}", myIdentifier); 188 transport.close(); 189 LOG.log(Level.FINE, "Closed the transport"); 190 } catch (final Exception e) { 191 LOG.log(Level.SEVERE, "Unable to close the transport.", e); 192 } 193 194 try { 195 LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier); 196 reRecvStage.close(); 197 LOG.log(Level.FINE, "Closed the remote receiver stage"); 198 } catch (final Exception e) { 199 LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e); 200 } 201 } 202 203 }; 204 205 final ExecutorService closeExecutor = Executors.newSingleThreadExecutor(); 206 closeExecutor.submit(closeRunnable); 207 closeExecutor.shutdown(); 208 if (!closeExecutor.isShutdown()) { 209 LOG.log(Level.SEVERE, "close executor did not shutdown properly."); 210 } 211 212 final long endTime = System.currentTimeMillis() + CLOSE_EXECUTOR_TIMEOUT; 213 while (!closeExecutor.isTerminated()) { 214 try { 215 final long waitTime = endTime - System.currentTimeMillis(); 216 closeExecutor.awaitTermination(waitTime, TimeUnit.MILLISECONDS); 217 } catch (final InterruptedException e) { 218 LOG.log(Level.FINE, "Interrupted", e); 219 } 220 } 221 222 if (closeExecutor.isTerminated()) { 223 LOG.log(Level.FINE, "Close executor terminated properly."); 224 } else { 225 LOG.log(Level.SEVERE, "Close executor did not terminate properly."); 226 } 227 } 228 } 229}