This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ClosedContext;
023import org.apache.reef.driver.context.ContextMessage;
024import org.apache.reef.driver.context.FailedContext;
025import org.apache.reef.driver.evaluator.AllocatedEvaluator;
026import org.apache.reef.driver.evaluator.CompletedEvaluator;
027import org.apache.reef.driver.evaluator.FailedEvaluator;
028import org.apache.reef.driver.parameters.*;
029import org.apache.reef.driver.task.*;
030import org.apache.reef.runtime.common.driver.DriverExceptionHandler;
031import org.apache.reef.runtime.common.utils.DispatchingEStage;
032import org.apache.reef.tang.annotations.Parameter;
033import org.apache.reef.wake.EventHandler;
034
035import javax.inject.Inject;
036import java.util.HashSet;
037import java.util.Set;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * Central dispatcher for all Evaluator related events. This exists once per Evaluator.
043 */
044public final class EvaluatorMessageDispatcher implements AutoCloseable {
045
046  private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName());
047
048  /**
049   * Dispatcher used for application provided event handlers.
050   */
051  private final DispatchingEStage applicationDispatcher;
052
053  /**
054   * Dispatcher used for service provided event handlers.
055   */
056  private final DispatchingEStage serviceDispatcher;
057
058
059  /**
060   * Dispatcher used for application provided driver-restart specific event handlers.
061   */
062  private final DispatchingEStage driverRestartApplicationDispatcher;
063
064  /**
065   * Dispatcher used for service provided driver-restart specific event handlers.
066   */
067  private final DispatchingEStage driverRestartServiceDispatcher;
068
069  @Inject
070  EvaluatorMessageDispatcher(
071      // Application-provided Context event handlers
072      @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers,
073      @Parameter(ContextClosedHandlers.class) final Set<EventHandler<ClosedContext>> contextClosedHandlers,
074      @Parameter(ContextFailedHandlers.class) final Set<EventHandler<FailedContext>> contextFailedHandlers,
075      @Parameter(ContextMessageHandlers.class) final Set<EventHandler<ContextMessage>> contextMessageHandlers,
076      // Service-provided Context event handlers
077      @Parameter(ServiceContextActiveHandlers.class)
078      final Set<EventHandler<ActiveContext>> serviceContextActiveHandlers,
079      @Parameter(ServiceContextClosedHandlers.class)
080      final Set<EventHandler<ClosedContext>> serviceContextClosedHandlers,
081      @Parameter(ServiceContextFailedHandlers.class)
082      final Set<EventHandler<FailedContext>> serviceContextFailedHandlers,
083      @Parameter(ServiceContextMessageHandlers.class)
084      final Set<EventHandler<ContextMessage>> serviceContextMessageHandlers,
085      // Application-provided Task event handlers
086      @Parameter(TaskRunningHandlers.class) final Set<EventHandler<RunningTask>> taskRunningHandlers,
087      @Parameter(TaskCompletedHandlers.class) final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
088      @Parameter(TaskSuspendedHandlers.class) final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
089      @Parameter(TaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
090      @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
091      // Service-provided Task event handlers
092      @Parameter(ServiceTaskRunningHandlers.class) final Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers,
093      @Parameter(ServiceTaskCompletedHandlers.class)
094      final Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers,
095      @Parameter(ServiceTaskSuspendedHandlers.class)
096      final Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers,
097      @Parameter(ServiceTaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers,
098      @Parameter(ServiceTaskFailedHandlers.class) final Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers,
099      // Application-provided Evaluator event handlers
100      @Parameter(EvaluatorAllocatedHandlers.class)
101      final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
102      @Parameter(EvaluatorFailedHandlers.class) final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
103      @Parameter(EvaluatorCompletedHandlers.class)
104      final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
105      // Service-provided Evaluator event handlers
106      @Parameter(ServiceEvaluatorAllocatedHandlers.class)
107      final Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers,
108      @Parameter(ServiceEvaluatorFailedHandlers.class)
109      final Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers,
110      @Parameter(ServiceEvaluatorCompletedHandlers.class)
111      final Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers,
112
113      // Application event handlers specific to a Driver restart
114      @Parameter(DriverRestartTaskRunningHandlers.class)
115      final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
116      @Parameter(DriverRestartContextActiveHandlers.class)
117      final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers,
118      @Parameter(DriverRestartFailedEvaluatorHandlers.class)
119      final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedHandlers,
120
121      // Service-provided event handlers specific to a Driver restart
122      @Parameter(ServiceDriverRestartTaskRunningHandlers.class)
123      final Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers,
124      @Parameter(ServiceDriverRestartContextActiveHandlers.class)
125      final Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers,
126      @Parameter(ServiceDriverRestartFailedEvaluatorHandlers.class)
127      final Set<EventHandler<FailedEvaluator>> serviceDriverRestartFailedEvaluatorHandlers,
128
129      @Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads,
130      @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String evaluatorIdentifier,
131      final DriverExceptionHandler driverExceptionHandler,
132      final IdlenessCallbackEventHandlerFactory idlenessCallbackEventHandlerFactory
133  ) {
134
135    this.serviceDispatcher = new DispatchingEStage(driverExceptionHandler, numberOfThreads, evaluatorIdentifier);
136    this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
137    this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
138    this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher);
139
140    // Application Context event handlers
141    this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
142    this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
143    this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
144    this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
145
146    // Service Context event handlers
147    this.serviceDispatcher.register(ActiveContext.class, serviceContextActiveHandlers);
148    this.serviceDispatcher.register(ClosedContext.class, serviceContextClosedHandlers);
149    this.serviceDispatcher.register(FailedContext.class, serviceContextFailedHandlers);
150    this.serviceDispatcher.register(ContextMessage.class, serviceContextMessageHandlers);
151
152    // Application Task event handlers.
153    this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
154    this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
155    this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
156    this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
157    this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
158
159    // Service Task event handlers
160    this.serviceDispatcher.register(RunningTask.class, serviceTaskRunningEventHandlers);
161    this.serviceDispatcher.register(CompletedTask.class, serviceTaskCompletedEventHandlers);
162    this.serviceDispatcher.register(SuspendedTask.class, serviceTaskSuspendedEventHandlers);
163    this.serviceDispatcher.register(TaskMessage.class, serviceTaskMessageEventHandlers);
164    this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers);
165
166    // Application Evaluator event handlers
167    this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
168
169    // Service Evaluator event handlers
170    this.serviceDispatcher.register(FailedEvaluator.class, serviceEvaluatorFailedHandlers);
171    this.serviceDispatcher.register(CompletedEvaluator.class, serviceEvaluatorCompletedHandlers);
172    this.serviceDispatcher.register(AllocatedEvaluator.class, serviceEvaluatorAllocatedEventHandlers);
173
174    // Application event handlers specific to a Driver restart
175    this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers);
176    this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers);
177
178    final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedCallbackHandlers = new HashSet<>();
179    for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : driverRestartEvaluatorFailedHandlers) {
180      driverRestartEvaluatorFailedCallbackHandlers.add(
181          idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler));
182    }
183
184    this.driverRestartApplicationDispatcher.register(
185        FailedEvaluator.class, driverRestartEvaluatorFailedCallbackHandlers);
186
187    // Service event handlers specific to a Driver restart
188    this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers);
189    this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers);
190    this.driverRestartServiceDispatcher.register(FailedEvaluator.class, serviceDriverRestartFailedEvaluatorHandlers);
191
192    final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedCallbackHandlers = new HashSet<>();
193    for (final EventHandler<CompletedEvaluator> evaluatorCompletedHandler : evaluatorCompletedHandlers) {
194      evaluatorCompletedCallbackHandlers.add(
195          idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorCompletedHandler));
196    }
197    this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedCallbackHandlers);
198
199    final Set<EventHandler<FailedEvaluator>> evaluatorFailedCallbackHandlers = new HashSet<>();
200    for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : evaluatorFailedHandlers) {
201      evaluatorFailedCallbackHandlers.add(
202          idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler));
203    }
204    this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedCallbackHandlers);
205
206    LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'");
207  }
208
209  public void onEvaluatorAllocated(final AllocatedEvaluator allocatedEvaluator) {
210    this.dispatch(AllocatedEvaluator.class, allocatedEvaluator);
211  }
212
213  public void onEvaluatorFailed(final FailedEvaluator failedEvaluator) {
214    this.dispatch(FailedEvaluator.class, failedEvaluator);
215  }
216
217  public void onEvaluatorCompleted(final CompletedEvaluator completedEvaluator) {
218    this.dispatch(CompletedEvaluator.class, completedEvaluator);
219  }
220
221  public void onTaskRunning(final RunningTask runningTask) {
222    this.dispatch(RunningTask.class, runningTask);
223  }
224
225  public void onTaskCompleted(final CompletedTask completedTask) {
226    this.dispatch(CompletedTask.class, completedTask);
227  }
228
229  public void onTaskSuspended(final SuspendedTask suspendedTask) {
230    this.dispatch(SuspendedTask.class, suspendedTask);
231  }
232
233  public void onTaskMessage(final TaskMessage taskMessage) {
234    this.dispatch(TaskMessage.class, taskMessage);
235  }
236
237  public void onTaskFailed(final FailedTask failedTask) {
238    this.dispatch(FailedTask.class, failedTask);
239  }
240
241  public void onContextActive(final ActiveContext activeContext) {
242    this.dispatch(ActiveContext.class, activeContext);
243  }
244
245  public void onContextClose(final ClosedContext closedContext) {
246    this.dispatch(ClosedContext.class, closedContext);
247  }
248
249  public void onContextFailed(final FailedContext failedContext) {
250    this.dispatch(FailedContext.class, failedContext);
251  }
252
253  public void onContextMessage(final ContextMessage contextMessage) {
254    this.dispatch(ContextMessage.class, contextMessage);
255  }
256
257  public void onDriverRestartTaskRunning(final RunningTask runningTask) {
258    this.dispatchForRestartedDriver(RunningTask.class, runningTask);
259  }
260
261  public void onDriverRestartContextActive(final ActiveContext activeContext) {
262    this.dispatchForRestartedDriver(ActiveContext.class, activeContext);
263  }
264
265  public void onDriverRestartEvaluatorFailed(final FailedEvaluator failedEvaluator) {
266    this.dispatchForRestartedDriver(FailedEvaluator.class, failedEvaluator);
267  }
268
269  boolean isEmpty() {
270    return this.applicationDispatcher.isEmpty();
271  }
272
273  private <T, U extends T> void dispatch(final Class<T> type, final U message) {
274    this.serviceDispatcher.onNext(type, message);
275    this.applicationDispatcher.onNext(type, message);
276  }
277
278  private <T, U extends T> void dispatchForRestartedDriver(final Class<T> type, final U message) {
279    this.driverRestartServiceDispatcher.onNext(type, message);
280    this.driverRestartApplicationDispatcher.onNext(type, message);
281  }
282
283  @Override
284  public void close() throws Exception {
285    /**
286     * This effectively closes all dispatchers as they share the same stage.
287     */
288    this.serviceDispatcher.close();
289  }
290}