This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.impl;
020
021import org.apache.reef.wake.EStage;
022import org.apache.reef.wake.EventHandler;
023import org.apache.reef.wake.WakeParameters;
024import org.apache.reef.wake.impl.DefaultThreadFactory;
025import org.apache.reef.wake.impl.ThreadPoolStage;
026import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
027
028import java.util.List;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.TimeUnit;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Receive incoming events and dispatch to correct handlers.
037 */
038public class RemoteReceiverStage implements EStage<TransportEvent> {
039
040  private static final Logger LOG = Logger.getLogger(RemoteReceiverStage.class.getName());
041
042  private final EventHandler<TransportEvent> handler;
043  private final ThreadPoolStage<TransportEvent> stage;
044  private final ExecutorService executor; // for decoupling
045
046  private final long shutdownTimeout = WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT;
047
048  /**
049   * Constructs a remote receiver stage.
050   *
051   * @param handler      the handler of remote events
052   * @param errorHandler the exception handler
053   * @param numThreads   the number of threads
054   */
055  public RemoteReceiverStage(final EventHandler<RemoteEvent<byte[]>> handler,
056                             final EventHandler<Throwable> errorHandler, final int numThreads) {
057
058    this.handler = new RemoteReceiverEventHandler(handler);
059
060    this.executor = Executors.newFixedThreadPool(
061        numThreads, new DefaultThreadFactory(RemoteReceiverStage.class.getName()));
062
063    this.stage = new ThreadPoolStage<>(this.handler, this.executor, errorHandler);
064  }
065
066  /**
067   * Handles the received event.
068   *
069   * @param value the event
070   */
071  @Override
072  public void onNext(final TransportEvent value) {
073    LOG.log(Level.FINEST, "{0}", value);
074    stage.onNext(value);
075  }
076
077  /**
078   * Closes the stage.
079   */
080  @Override
081  public void close() throws Exception {
082    LOG.log(Level.FINE, "close");
083
084    if (this.executor != null) {
085      this.executor.shutdown();
086      try {
087        // wait for threads to finish for timeout
088        if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
089          LOG.log(Level.WARNING, "Executor did not terminate in {0} ms.", shutdownTimeout);
090          final List<Runnable> droppedRunnables = executor.shutdownNow();
091          LOG.log(Level.WARNING, "Executor dropped {0} tasks.", droppedRunnables.size());
092        }
093      } catch (final InterruptedException e) {
094        LOG.log(Level.WARNING, "Close interrupted", e);
095        throw new RemoteRuntimeException(e);
096      }
097    }
098  }
099}