001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ClosedContext;
023import org.apache.reef.driver.context.ContextMessage;
024import org.apache.reef.driver.context.FailedContext;
025import org.apache.reef.driver.evaluator.AllocatedEvaluator;
026import org.apache.reef.driver.evaluator.CompletedEvaluator;
027import org.apache.reef.driver.evaluator.FailedEvaluator;
028import org.apache.reef.driver.parameters.*;
029import org.apache.reef.driver.task.*;
030import org.apache.reef.runtime.common.driver.DriverExceptionHandler;
031import org.apache.reef.runtime.common.utils.DispatchingEStage;
032import org.apache.reef.tang.annotations.Parameter;
033import org.apache.reef.wake.EventHandler;
034
035import javax.inject.Inject;
036import java.util.HashSet;
037import java.util.Set;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * Central dispatcher for all Evaluator related events. This exists once per Evaluator.
043 */
044public final class EvaluatorMessageDispatcher implements AutoCloseable {
045
046  private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName());
047
048  private final String evaluatorIdentifier;
049
050  /**
051   * Dispatcher used for application provided event handlers.
052   */
053  private final DispatchingEStage applicationDispatcher;
054
055  /**
056   * Dispatcher used for service provided event handlers.
057   */
058  private final DispatchingEStage serviceDispatcher;
059
060  /**
061   * Dispatcher used for application provided driver-restart specific event handlers.
062   */
063  private final DispatchingEStage driverRestartApplicationDispatcher;
064
065  /**
066   * Dispatcher used for service provided driver-restart specific event handlers.
067   */
068  private final DispatchingEStage driverRestartServiceDispatcher;
069
070  @Inject
071  private EvaluatorMessageDispatcher(
072      // Application-provided Context event handlers
073      @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers,
074      @Parameter(ContextClosedHandlers.class) final Set<EventHandler<ClosedContext>> contextClosedHandlers,
075      @Parameter(ContextFailedHandlers.class) final Set<EventHandler<FailedContext>> contextFailedHandlers,
076      @Parameter(ContextMessageHandlers.class) final Set<EventHandler<ContextMessage>> contextMessageHandlers,
077      // Service-provided Context event handlers
078      @Parameter(ServiceContextActiveHandlers.class)
079      final Set<EventHandler<ActiveContext>> serviceContextActiveHandlers,
080      @Parameter(ServiceContextClosedHandlers.class)
081      final Set<EventHandler<ClosedContext>> serviceContextClosedHandlers,
082      @Parameter(ServiceContextFailedHandlers.class)
083      final Set<EventHandler<FailedContext>> serviceContextFailedHandlers,
084      @Parameter(ServiceContextMessageHandlers.class)
085      final Set<EventHandler<ContextMessage>> serviceContextMessageHandlers,
086      // Application-provided Task event handlers
087      @Parameter(TaskRunningHandlers.class) final Set<EventHandler<RunningTask>> taskRunningHandlers,
088      @Parameter(TaskCompletedHandlers.class) final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
089      @Parameter(TaskSuspendedHandlers.class) final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
090      @Parameter(TaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
091      @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
092      // Service-provided Task event handlers
093      @Parameter(ServiceTaskRunningHandlers.class) final Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers,
094      @Parameter(ServiceTaskCompletedHandlers.class)
095      final Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers,
096      @Parameter(ServiceTaskSuspendedHandlers.class)
097      final Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers,
098      @Parameter(ServiceTaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers,
099      @Parameter(ServiceTaskFailedHandlers.class) final Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers,
100      // Application-provided Evaluator event handlers
101      @Parameter(EvaluatorAllocatedHandlers.class)
102      final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
103      @Parameter(EvaluatorFailedHandlers.class) final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
104      @Parameter(EvaluatorCompletedHandlers.class)
105      final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
106      // Service-provided Evaluator event handlers
107      @Parameter(ServiceEvaluatorAllocatedHandlers.class)
108      final Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers,
109      @Parameter(ServiceEvaluatorFailedHandlers.class)
110      final Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers,
111      @Parameter(ServiceEvaluatorCompletedHandlers.class)
112      final Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers,
113
114      // Application event handlers specific to a Driver restart
115      @Parameter(DriverRestartTaskRunningHandlers.class)
116      final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
117      @Parameter(DriverRestartContextActiveHandlers.class)
118      final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers,
119      @Parameter(DriverRestartFailedEvaluatorHandlers.class)
120      final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedHandlers,
121
122      // Service-provided event handlers specific to a Driver restart
123      @Parameter(ServiceDriverRestartTaskRunningHandlers.class)
124      final Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers,
125      @Parameter(ServiceDriverRestartContextActiveHandlers.class)
126      final Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers,
127      @Parameter(ServiceDriverRestartFailedEvaluatorHandlers.class)
128      final Set<EventHandler<FailedEvaluator>> serviceDriverRestartFailedEvaluatorHandlers,
129
130      @Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads,
131      @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String evaluatorIdentifier,
132      final DriverExceptionHandler driverExceptionHandler,
133      final IdlenessCallbackEventHandlerFactory idlenessCallbackEventHandlerFactory) {
134
135    LOG.log(Level.FINER, "Creating message dispatcher for {0}", evaluatorIdentifier);
136
137    this.evaluatorIdentifier = evaluatorIdentifier;
138    this.serviceDispatcher = new DispatchingEStage(
139        driverExceptionHandler, numberOfThreads, "EvaluatorMessageDispatcher:" + evaluatorIdentifier);
140
141    this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
142    this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
143    this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher);
144
145    // Application Context event handlers
146    this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
147    this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
148    this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
149    this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
150
151    // Service Context event handlers
152    this.serviceDispatcher.register(ActiveContext.class, serviceContextActiveHandlers);
153    this.serviceDispatcher.register(ClosedContext.class, serviceContextClosedHandlers);
154    this.serviceDispatcher.register(FailedContext.class, serviceContextFailedHandlers);
155    this.serviceDispatcher.register(ContextMessage.class, serviceContextMessageHandlers);
156
157    // Application Task event handlers.
158    this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
159    this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
160    this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
161    this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
162    this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
163
164    // Service Task event handlers
165    this.serviceDispatcher.register(RunningTask.class, serviceTaskRunningEventHandlers);
166    this.serviceDispatcher.register(CompletedTask.class, serviceTaskCompletedEventHandlers);
167    this.serviceDispatcher.register(SuspendedTask.class, serviceTaskSuspendedEventHandlers);
168    this.serviceDispatcher.register(TaskMessage.class, serviceTaskMessageEventHandlers);
169    this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers);
170
171    // Application Evaluator event handlers
172    this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
173
174    // Service Evaluator event handlers
175    this.serviceDispatcher.register(FailedEvaluator.class, serviceEvaluatorFailedHandlers);
176    this.serviceDispatcher.register(CompletedEvaluator.class, serviceEvaluatorCompletedHandlers);
177    this.serviceDispatcher.register(AllocatedEvaluator.class, serviceEvaluatorAllocatedEventHandlers);
178
179    // Application event handlers specific to a Driver restart
180    this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers);
181    this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers);
182
183    final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedCallbackHandlers = new HashSet<>();
184    for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : driverRestartEvaluatorFailedHandlers) {
185      driverRestartEvaluatorFailedCallbackHandlers.add(
186          idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler));
187    }
188
189    this.driverRestartApplicationDispatcher.register(
190        FailedEvaluator.class, driverRestartEvaluatorFailedCallbackHandlers);
191
192    // Service event handlers specific to a Driver restart
193    this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers);
194    this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers);
195    this.driverRestartServiceDispatcher.register(FailedEvaluator.class, serviceDriverRestartFailedEvaluatorHandlers);
196
197    final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedCallbackHandlers = new HashSet<>();
198    for (final EventHandler<CompletedEvaluator> evaluatorCompletedHandler : evaluatorCompletedHandlers) {
199      evaluatorCompletedCallbackHandlers.add(
200          idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorCompletedHandler));
201    }
202    this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedCallbackHandlers);
203
204    final Set<EventHandler<FailedEvaluator>> evaluatorFailedCallbackHandlers = new HashSet<>();
205    for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : evaluatorFailedHandlers) {
206      evaluatorFailedCallbackHandlers.add(
207          idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler));
208    }
209    this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedCallbackHandlers);
210
211    LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'");
212  }
213
214  public void onEvaluatorAllocated(final AllocatedEvaluator allocatedEvaluator) {
215    this.dispatch(AllocatedEvaluator.class, allocatedEvaluator);
216  }
217
218  public void onEvaluatorFailed(final FailedEvaluator failedEvaluator) {
219    this.dispatch(FailedEvaluator.class, failedEvaluator);
220  }
221
222  public void onEvaluatorCompleted(final CompletedEvaluator completedEvaluator) {
223    this.dispatch(CompletedEvaluator.class, completedEvaluator);
224  }
225
226  public void onTaskRunning(final RunningTask runningTask) {
227    this.dispatch(RunningTask.class, runningTask);
228  }
229
230  public void onTaskCompleted(final CompletedTask completedTask) {
231    this.dispatch(CompletedTask.class, completedTask);
232  }
233
234  public void onTaskSuspended(final SuspendedTask suspendedTask) {
235    this.dispatch(SuspendedTask.class, suspendedTask);
236  }
237
238  public void onTaskMessage(final TaskMessage taskMessage) {
239    this.dispatch(TaskMessage.class, taskMessage);
240  }
241
242  public void onTaskFailed(final FailedTask failedTask) {
243    this.dispatch(FailedTask.class, failedTask);
244  }
245
246  public void onContextActive(final ActiveContext activeContext) {
247    this.dispatch(ActiveContext.class, activeContext);
248  }
249
250  public void onContextClose(final ClosedContext closedContext) {
251    this.dispatch(ClosedContext.class, closedContext);
252  }
253
254  public void onContextFailed(final FailedContext failedContext) {
255    this.dispatch(FailedContext.class, failedContext);
256  }
257
258  public void onContextMessage(final ContextMessage contextMessage) {
259    this.dispatch(ContextMessage.class, contextMessage);
260  }
261
262  public void onDriverRestartTaskRunning(final RunningTask runningTask) {
263    this.dispatchForRestartedDriver(RunningTask.class, runningTask);
264  }
265
266  public void onDriverRestartContextActive(final ActiveContext activeContext) {
267    this.dispatchForRestartedDriver(ActiveContext.class, activeContext);
268  }
269
270  public void onDriverRestartEvaluatorFailed(final FailedEvaluator failedEvaluator) {
271    this.dispatchForRestartedDriver(FailedEvaluator.class, failedEvaluator);
272  }
273
274  boolean isEmpty() {
275    return this.applicationDispatcher.isEmpty();
276  }
277
278  private <T, U extends T> void dispatch(final Class<T> type, final U message) {
279    this.serviceDispatcher.onNext(type, message);
280    this.applicationDispatcher.onNext(type, message);
281  }
282
283  private <T, U extends T> void dispatchForRestartedDriver(final Class<T> type, final U message) {
284    this.driverRestartServiceDispatcher.onNext(type, message);
285    this.driverRestartApplicationDispatcher.onNext(type, message);
286  }
287
288  @Override
289  public void close() {
290    LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier);
291    // This effectively closes all dispatchers as they share the same stage.
292    this.serviceDispatcher.close();
293    if (!this.serviceDispatcher.isClosed()) {
294      LOG.log(Level.SEVERE,
295          "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close",
296          this.evaluatorIdentifier);
297    }
298  }
299}