This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ClosedContext;
023import org.apache.reef.driver.context.ContextMessage;
024import org.apache.reef.driver.context.FailedContext;
025import org.apache.reef.driver.evaluator.AllocatedEvaluator;
026import org.apache.reef.driver.evaluator.CompletedEvaluator;
027import org.apache.reef.driver.evaluator.FailedEvaluator;
028import org.apache.reef.driver.parameters.*;
029import org.apache.reef.driver.task.*;
030import org.apache.reef.runtime.common.DriverRestartCompleted;
031import org.apache.reef.runtime.common.driver.DriverExceptionHandler;
032import org.apache.reef.runtime.common.utils.DispatchingEStage;
033import org.apache.reef.tang.annotations.Parameter;
034import org.apache.reef.wake.EventHandler;
035
036import javax.inject.Inject;
037import java.util.Set;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * Central dispatcher for all Evaluator related events. This exists once per Evaluator.
043 */
044public final class EvaluatorMessageDispatcher {
045
046  private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName());
047
048  /**
049   * Dispatcher used for application provided event handlers.
050   */
051  private final DispatchingEStage applicationDispatcher;
052
053  /**
054   * Dispatcher used for service provided event handlers.
055   */
056  private final DispatchingEStage serviceDispatcher;
057
058
059  /**
060   * Dispatcher used for application provided driver-restart specific event handlers.
061   */
062  private final DispatchingEStage driverRestartApplicationDispatcher;
063
064  /**
065   * Dispatcher used for service provided driver-restart specific event handlers.
066   */
067  private final DispatchingEStage driverRestartServiceDispatcher;
068
069  @Inject
070  EvaluatorMessageDispatcher(
071      // Application-provided Context event handlers
072      final @Parameter(ContextActiveHandlers.class) Set<EventHandler<ActiveContext>> contextActiveHandlers,
073      final @Parameter(ContextClosedHandlers.class) Set<EventHandler<ClosedContext>> contextClosedHandlers,
074      final @Parameter(ContextFailedHandlers.class) Set<EventHandler<FailedContext>> contextFailedHandlers,
075      final @Parameter(ContextMessageHandlers.class) Set<EventHandler<ContextMessage>> contextMessageHandlers,
076      // Service-provided Context event handlers
077      final @Parameter(ServiceContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceContextActiveHandlers,
078      final @Parameter(ServiceContextClosedHandlers.class) Set<EventHandler<ClosedContext>> serviceContextClosedHandlers,
079      final @Parameter(ServiceContextFailedHandlers.class) Set<EventHandler<FailedContext>> serviceContextFailedHandlers,
080      final @Parameter(ServiceContextMessageHandlers.class) Set<EventHandler<ContextMessage>> serviceContextMessageHandlers,
081      // Application-provided Task event handlers
082      final @Parameter(TaskRunningHandlers.class) Set<EventHandler<RunningTask>> taskRunningHandlers,
083      final @Parameter(TaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> taskCompletedHandlers,
084      final @Parameter(TaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
085      final @Parameter(TaskMessageHandlers.class) Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
086      final @Parameter(TaskFailedHandlers.class) Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
087      // Service-provided Task event handlers
088      final @Parameter(ServiceTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers,
089      final @Parameter(ServiceTaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers,
090      final @Parameter(ServiceTaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers,
091      final @Parameter(ServiceTaskMessageHandlers.class) Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers,
092      final @Parameter(ServiceTaskFailedHandlers.class) Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers,
093      // Application-provided Evaluator event handlers
094      final @Parameter(EvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
095      final @Parameter(EvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
096      final @Parameter(EvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
097      // Service-provided Evaluator event handlers
098      final @Parameter(ServiceEvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers,
099      final @Parameter(ServiceEvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers,
100      final @Parameter(ServiceEvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers,
101
102      // Application event handlers specific to a Driver restart
103      final @Parameter(DriverRestartTaskRunningHandlers.class) Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
104      final @Parameter(DriverRestartContextActiveHandlers.class) Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers,
105      final @Parameter(DriverRestartCompletedHandlers.class) Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers,
106
107      // Service-provided event handlers specific to a Driver restart
108      final @Parameter(ServiceDriverRestartTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers,
109      final @Parameter(ServiceDriverRestartContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers,
110      final @Parameter(ServiceDriverRestartCompletedHandlers.class) Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers,
111
112      final @Parameter(EvaluatorDispatcherThreads.class) int numberOfThreads,
113      final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorIdentifier,
114      final DriverExceptionHandler driverExceptionHandler) {
115
116    this.serviceDispatcher = new DispatchingEStage(driverExceptionHandler, numberOfThreads, evaluatorIdentifier);
117    this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
118    this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
119    this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher);
120
121    { // Application Context event handlers
122      this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
123      this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
124      this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
125      this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
126    }
127    { // Service Context event handlers
128      this.serviceDispatcher.register(ActiveContext.class, serviceContextActiveHandlers);
129      this.serviceDispatcher.register(ClosedContext.class, serviceContextClosedHandlers);
130      this.serviceDispatcher.register(FailedContext.class, serviceContextFailedHandlers);
131      this.serviceDispatcher.register(ContextMessage.class, serviceContextMessageHandlers);
132    }
133    { // Application Task event handlers.
134      this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
135      this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
136      this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
137      this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
138      this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
139    }
140    { // Service Task event handlers
141      this.serviceDispatcher.register(RunningTask.class, serviceTaskRunningEventHandlers);
142      this.serviceDispatcher.register(CompletedTask.class, serviceTaskCompletedEventHandlers);
143      this.serviceDispatcher.register(SuspendedTask.class, serviceTaskSuspendedEventHandlers);
144      this.serviceDispatcher.register(TaskMessage.class, serviceTaskMessageEventHandlers);
145      this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers);
146    }
147    { // Application Evaluator event handlers
148      this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers);
149      this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers);
150      this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
151    }
152    { // Service Evaluator event handlers
153      this.serviceDispatcher.register(FailedEvaluator.class, serviceEvaluatorFailedHandlers);
154      this.serviceDispatcher.register(CompletedEvaluator.class, serviceEvaluatorCompletedHandlers);
155      this.serviceDispatcher.register(AllocatedEvaluator.class, serviceEvaluatorAllocatedEventHandlers);
156    }
157
158    // Application event handlers specific to a Driver restart
159    {
160      this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers);
161      this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers);
162      this.driverRestartApplicationDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers);
163    }
164
165    // Service event handlers specific to a Driver restart
166    {
167      this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers);
168      this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers);
169      this.driverRestartServiceDispatcher.register(DriverRestartCompleted.class, serviceDriverRestartCompletedHandlers);
170    }
171    LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'");
172  }
173
174  public void onEvaluatorAllocated(final AllocatedEvaluator allocatedEvaluator) {
175    this.dispatch(AllocatedEvaluator.class, allocatedEvaluator);
176  }
177
178  public void onEvaluatorFailed(final FailedEvaluator failedEvaluator) {
179    this.dispatch(FailedEvaluator.class, failedEvaluator);
180  }
181
182  public void onEvaluatorCompleted(final CompletedEvaluator completedEvaluator) {
183    this.dispatch(CompletedEvaluator.class, completedEvaluator);
184  }
185
186  public void onTaskRunning(final RunningTask runningTask) {
187    this.dispatch(RunningTask.class, runningTask);
188  }
189
190  public void onTaskCompleted(final CompletedTask completedTask) {
191    this.dispatch(CompletedTask.class, completedTask);
192  }
193
194  public void onTaskSuspended(final SuspendedTask suspendedTask) {
195    this.dispatch(SuspendedTask.class, suspendedTask);
196  }
197
198  public void onTaskMessage(final TaskMessage taskMessage) {
199    this.dispatch(TaskMessage.class, taskMessage);
200  }
201
202  public void onTaskFailed(final FailedTask failedTask) {
203    this.dispatch(FailedTask.class, failedTask);
204  }
205
206  public void onContextActive(final ActiveContext activeContext) {
207    this.dispatch(ActiveContext.class, activeContext);
208  }
209
210  public void onContextClose(final ClosedContext closedContext) {
211    this.dispatch(ClosedContext.class, closedContext);
212  }
213
214  public void onContextFailed(final FailedContext failedContext) {
215    this.dispatch(FailedContext.class, failedContext);
216  }
217
218  public void onContextMessage(final ContextMessage contextMessage) {
219    this.dispatch(ContextMessage.class, contextMessage);
220  }
221
222  public void onDriverRestartTaskRunning(final RunningTask runningTask) {
223    this.dispatchForRestartedDriver(RunningTask.class, runningTask);
224  }
225
226  public void OnDriverRestartContextActive(final ActiveContext activeContext) {
227    this.dispatchForRestartedDriver(ActiveContext.class, activeContext);
228  }
229
230  public void OnDriverRestartCompleted(final DriverRestartCompleted restartCompleted) {
231    this.dispatchForRestartedDriver(DriverRestartCompleted.class, restartCompleted);
232  }
233
234  boolean isEmpty() {
235    return this.applicationDispatcher.isEmpty();
236  }
237
238  private <T, U extends T> void dispatch(final Class<T> type, final U message) {
239    this.serviceDispatcher.onNext(type, message);
240    this.applicationDispatcher.onNext(type, message);
241  }
242
243  private <T, U extends T> void dispatchForRestartedDriver(final Class<T> type, final U message) {
244    this.driverRestartApplicationDispatcher.onNext(type, message);
245    this.driverRestartServiceDispatcher.onNext(type, message);
246  }
247}