This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.impl;
020
021import org.apache.reef.tang.annotations.Parameter;
022import org.apache.reef.wake.EStage;
023import org.apache.reef.wake.EventHandler;
024import org.apache.reef.wake.impl.StageManager;
025import org.apache.reef.wake.remote.*;
026import org.apache.reef.wake.remote.transport.Transport;
027import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
028
029import javax.inject.Inject;
030import java.net.InetSocketAddress;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039/**
040 * Default remote manager implementation
041 */
042public class DefaultRemoteManagerImplementation implements RemoteManager {
043
044  private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName());
045
046  private static final AtomicInteger counter = new AtomicInteger(0);
047
048  /**
049   * The timeout used for the execute running in close()
050   */
051  private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms
052  private final AtomicBoolean closed = new AtomicBoolean(false);
053  private final String name;
054  private final Transport transport;
055  private final RemoteSenderStage reSendStage;
056  private final EStage<TransportEvent> reRecvStage;
057  private final HandlerContainer handlerContainer;
058  private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
059  private RemoteIdentifier myIdentifier;
060
061  @Inject
062  public <T> DefaultRemoteManagerImplementation(
063      final @Parameter(RemoteConfiguration.ManagerName.class) String name,
064      final @Parameter(RemoteConfiguration.HostAddress.class) String hostAddress,
065      final @Parameter(RemoteConfiguration.Port.class) int listeningPort,
066      final @Parameter(RemoteConfiguration.MessageCodec.class) Codec<T> codec,
067      final @Parameter(RemoteConfiguration.ErrorHandler.class) EventHandler<Throwable> errorHandler,
068      final @Parameter(RemoteConfiguration.OrderingGuarantee.class) boolean orderingGuarantee,
069      final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries,
070      final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout) {
071
072    this.name = name;
073    this.handlerContainer = new HandlerContainer<>(name, codec);
074
075    this.reRecvStage = orderingGuarantee ?
076        new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
077        new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
078
079    if ("##UNKNOWN##".equals(hostAddress)) {
080      this.transport = new NettyMessagingTransport(
081          NetUtils.getLocalAddress(), listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout);
082    } else {
083      this.transport = new NettyMessagingTransport(
084          hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout);
085    }
086
087    this.handlerContainer.setTransport(this.transport);
088
089    this.myIdentifier = new SocketRemoteIdentifier(
090        (InetSocketAddress) this.transport.getLocalAddress());
091
092    this.reSendStage = new RemoteSenderStage(codec, this.transport, 10);
093
094    StageManager.instance().register(this);
095
096    LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}",
097        new Object[]{this.name, this.myIdentifier, counter.incrementAndGet(),
098            this.transport.getLocalAddress().toString(),
099            this.transport.getListeningPort()}
100    );
101  }
102
103  /**
104   * Returns a proxy event handler for a remote identifier and a message type
105   */
106  @Override
107  public <T> EventHandler<T> getHandler(
108      final RemoteIdentifier destinationIdentifier, final Class<? extends T> messageType) {
109
110    if (LOG.isLoggable(Level.FINE)) {
111      LOG.log(Level.FINE, "RemoteManager: {0} destinationIdentifier: {1} messageType: {2}",
112          new Object[]{this.name, destinationIdentifier, messageType.getName()});
113    }
114
115    return new ProxyEventHandler<>(this.myIdentifier, destinationIdentifier,
116        "default", this.reSendStage.<T>getHandler(), this.seqGen);
117  }
118
119  /**
120   * Registers an event handler for a remote identifier and a message type and
121   * returns a subscription
122   */
123  @Override
124  public <T, U extends T> AutoCloseable registerHandler(
125      final RemoteIdentifier sourceIdentifier,
126      final Class<U> messageType, final EventHandler<T> theHandler) {
127    if (LOG.isLoggable(Level.FINE)) {
128      LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2} handler: {3}",
129          new Object[]{this.name, sourceIdentifier, messageType.getName(),
130              theHandler.getClass().getName()});
131    }
132    return this.handlerContainer.registerHandler(sourceIdentifier, messageType, theHandler);
133  }
134
135  /**
136   * Registers an event handler for a message type and returns a subscription
137   */
138  @Override
139  public <T, U extends T> AutoCloseable registerHandler(
140      final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) {
141    if (LOG.isLoggable(Level.FINE)) {
142      LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}",
143          new Object[]{this.name, messageType.getName(), theHandler.getClass().getName()});
144    }
145    return this.handlerContainer.registerHandler(messageType, theHandler);
146  }
147
148  /**
149   * Registers an exception handler and returns a subscription
150   */
151  @Override
152  public AutoCloseable registerErrorHandler(final EventHandler<Exception> theHandler) {
153    if (LOG.isLoggable(Level.FINE)) {
154      LOG.log(Level.FINE, "RemoteManager: {0} handler: {1}",
155          new Object[]{this.name, theHandler.getClass().getName()});
156    }
157    return this.handlerContainer.registerErrorHandler(theHandler);
158  }
159
160  /**
161   * Returns my identifier
162   */
163  @Override
164  public RemoteIdentifier getMyIdentifier() {
165    return this.myIdentifier;
166  }
167
168  @Override
169  public void close() {
170    if (closed.compareAndSet(false, true)) {
171
172      LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}",
173          new Object[]{this.name, this.myIdentifier});
174
175      final Runnable closeRunnable = new Runnable() {
176        @Override
177        public void run() {
178          try {
179            LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier);
180            reSendStage.close();
181            LOG.log(Level.FINE, "Closed the remote sender stage");
182          } catch (final Exception e) {
183            LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e);
184          }
185
186          try {
187            LOG.log(Level.FINE, "Closing transport {0}", myIdentifier);
188            transport.close();
189            LOG.log(Level.FINE, "Closed the transport");
190          } catch (final Exception e) {
191            LOG.log(Level.SEVERE, "Unable to close the transport.", e);
192          }
193
194          try {
195            LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier);
196            reRecvStage.close();
197            LOG.log(Level.FINE, "Closed the remote receiver stage");
198          } catch (final Exception e) {
199            LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e);
200          }
201        }
202
203      };
204
205      final ExecutorService closeExecutor = Executors.newSingleThreadExecutor();
206      closeExecutor.submit(closeRunnable);
207      closeExecutor.shutdown();
208      if (!closeExecutor.isShutdown()) {
209        LOG.log(Level.SEVERE, "close executor did not shutdown properly.");
210      }
211
212      final long endTime = System.currentTimeMillis() + CLOSE_EXECUTOR_TIMEOUT;
213      while (!closeExecutor.isTerminated()) {
214        try {
215          final long waitTime = endTime - System.currentTimeMillis();
216          closeExecutor.awaitTermination(waitTime, TimeUnit.MILLISECONDS);
217        } catch (final InterruptedException e) {
218          LOG.log(Level.FINE, "Interrupted", e);
219        }
220      }
221
222      if (closeExecutor.isTerminated()) {
223        LOG.log(Level.FINE, "Close executor terminated properly.");
224      } else {
225        LOG.log(Level.SEVERE, "Close executor did not terminate properly.");
226      }
227    }
228  }
229}