001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.utils;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.tang.util.MonotonicHashMap;
024import org.apache.reef.util.ExceptionHandlingEventHandler;
025import org.apache.reef.wake.EventHandler;
026import org.apache.reef.wake.impl.ThreadPoolStage;
027
028import java.util.Collections;
029import java.util.Map;
030import java.util.Set;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * Delayed event router that dispatches messages to the proper event handler by type.
036 * This class is used in EvaluatorManager to isolate user threads from REEF.
037 */
038@Private
039@DriverSide
040public final class DispatchingEStage implements AutoCloseable {
041
042  private static final Logger LOG = Logger.getLogger(DispatchingEStage.class.getName());
043
044  /**
045   * A map of event handlers, populated in the register() method.
046   */
047  private final Map<Class<?>, EventHandler<?>> handlers =
048      Collections.synchronizedMap(new MonotonicHashMap<Class<?>, EventHandler<?>>());
049  /**
050   * Exception handler, one for all event handlers.
051   */
052  private final EventHandler<Throwable> errorHandler;
053  /**
054   * Thread pool to process delayed event handler invocations.
055   */
056  private final ThreadPoolStage<DelayedOnNext> stage;
057
058  /**
059   * @param errorHandler used for exceptions thrown from the event handlers registered.
060   * @param numThreads   number of threads to allocate to dispatch events.
061   * @param stageName    the name to use for the underlying stage.
062   *                     It will be carried over to name the Thread(s) spawned.
063   */
064  public DispatchingEStage(final EventHandler<Throwable> errorHandler,
065                           final int numThreads,
066                           final String stageName) {
067    this.errorHandler = errorHandler;
068    this.stage = new ThreadPoolStage<>(stageName,
069        new EventHandler<DelayedOnNext>() {
070          @Override
071          public void onNext(final DelayedOnNext promise) {
072            promise.handler.onNext(promise.message);
073          }
074        }, numThreads
075    );
076
077  }
078
079  /**
080   * Constructs a DispatchingEStage that uses the Thread pool and ErrorHandler of another one.
081   *
082   * @param other
083   */
084  public DispatchingEStage(final DispatchingEStage other) {
085    this.errorHandler = other.errorHandler;
086    this.stage = other.stage;
087  }
088
089  /**
090   * Register a new event handler.
091   *
092   * @param type     Message type to process with this handler.
093   * @param handlers A set of handlers that process that type of message.
094   * @param <T>      Message type.
095   * @param <U>      Type of message that event handler supports. Must be a subclass of T.
096   */
097  @SuppressWarnings("checkstyle:hiddenfield")
098  public <T, U extends T> void register(final Class<T> type, final Set<EventHandler<U>> handlers) {
099    this.handlers.put(type, new ExceptionHandlingEventHandler<>(
100        new BroadCastEventHandler<>(handlers), this.errorHandler));
101  }
102
103  /**
104   * Dispatch a new message by type.
105   * If the stage is already closed, log a warning and ignore the message.
106   * @param type    Type of event handler - must match the register() call.
107   * @param message A message to process. Must be a subclass of T.
108   * @param <T>     Message type that event handler supports.
109   * @param <U>     input message type. Must be a subclass of T.
110   */
111  @SuppressWarnings("unchecked")
112  public <T, U extends T> void onNext(final Class<T> type, final U message) {
113    if (this.isClosed()) {
114      LOG.log(Level.WARNING, "Dispatcher {0} already closed: ignoring message {1}: {2}",
115          new Object[] {this.stage, type.getCanonicalName(), message});
116    } else {
117      final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type);
118      this.stage.onNext(new DelayedOnNext(handler, message));
119    }
120  }
121
122  /**
123   * Return true if there are no messages queued or in processing, false otherwise.
124   */
125  public boolean isEmpty() {
126    return this.stage.getQueueLength() + this.stage.getActiveCount() == 0;
127  }
128
129  /**
130   * Close the stage adn stop accepting new messages.
131   * Closes the internal thread pool.
132   */
133  @Override
134  public void close() {
135    this.stage.close();
136  }
137
138  /**
139   * Check if the stage can still accept messages.
140   * @return true if the stage can no longer accept messages, false otherwise.
141   */
142  public boolean isClosed() {
143    return this.stage.isClosed();
144  }
145
146  /**
147   * Delayed EventHandler.onNext() call.
148   * Contains a message object and EventHandler to process it.
149   */
150  private static final class DelayedOnNext {
151
152    private final EventHandler<Object> handler;
153    private final Object message;
154
155    @SuppressWarnings("unchecked")
156    <T, U extends T> DelayedOnNext(final EventHandler<T> handler, final U message) {
157      this.handler = (EventHandler<Object>) handler;
158      this.message = message;
159    }
160  }
161}