001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ClosedContext; 023import org.apache.reef.driver.context.ContextMessage; 024import org.apache.reef.driver.context.FailedContext; 025import org.apache.reef.driver.evaluator.AllocatedEvaluator; 026import org.apache.reef.driver.evaluator.CompletedEvaluator; 027import org.apache.reef.driver.evaluator.FailedEvaluator; 028import org.apache.reef.driver.parameters.*; 029import org.apache.reef.driver.task.*; 030import org.apache.reef.runtime.common.DriverRestartCompleted; 031import org.apache.reef.runtime.common.driver.DriverExceptionHandler; 032import org.apache.reef.runtime.common.utils.DispatchingEStage; 033import org.apache.reef.tang.annotations.Parameter; 034import org.apache.reef.wake.EventHandler; 035 036import javax.inject.Inject; 037import java.util.Set; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Central dispatcher for all Evaluator related events. This exists once per Evaluator. 043 */ 044public final class EvaluatorMessageDispatcher { 045 046 private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName()); 047 048 /** 049 * Dispatcher used for application provided event handlers. 050 */ 051 private final DispatchingEStage applicationDispatcher; 052 053 /** 054 * Dispatcher used for service provided event handlers. 055 */ 056 private final DispatchingEStage serviceDispatcher; 057 058 059 /** 060 * Dispatcher used for application provided driver-restart specific event handlers. 061 */ 062 private final DispatchingEStage driverRestartApplicationDispatcher; 063 064 /** 065 * Dispatcher used for service provided driver-restart specific event handlers. 066 */ 067 private final DispatchingEStage driverRestartServiceDispatcher; 068 069 @Inject 070 EvaluatorMessageDispatcher( 071 // Application-provided Context event handlers 072 final @Parameter(ContextActiveHandlers.class) Set<EventHandler<ActiveContext>> contextActiveHandlers, 073 final @Parameter(ContextClosedHandlers.class) Set<EventHandler<ClosedContext>> contextClosedHandlers, 074 final @Parameter(ContextFailedHandlers.class) Set<EventHandler<FailedContext>> contextFailedHandlers, 075 final @Parameter(ContextMessageHandlers.class) Set<EventHandler<ContextMessage>> contextMessageHandlers, 076 // Service-provided Context event handlers 077 final @Parameter(ServiceContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceContextActiveHandlers, 078 final @Parameter(ServiceContextClosedHandlers.class) Set<EventHandler<ClosedContext>> serviceContextClosedHandlers, 079 final @Parameter(ServiceContextFailedHandlers.class) Set<EventHandler<FailedContext>> serviceContextFailedHandlers, 080 final @Parameter(ServiceContextMessageHandlers.class) Set<EventHandler<ContextMessage>> serviceContextMessageHandlers, 081 // Application-provided Task event handlers 082 final @Parameter(TaskRunningHandlers.class) Set<EventHandler<RunningTask>> taskRunningHandlers, 083 final @Parameter(TaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> taskCompletedHandlers, 084 final @Parameter(TaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, 085 final @Parameter(TaskMessageHandlers.class) Set<EventHandler<TaskMessage>> taskMessageEventHandlers, 086 final @Parameter(TaskFailedHandlers.class) Set<EventHandler<FailedTask>> taskExceptionEventHandlers, 087 // Service-provided Task event handlers 088 final @Parameter(ServiceTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers, 089 final @Parameter(ServiceTaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers, 090 final @Parameter(ServiceTaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers, 091 final @Parameter(ServiceTaskMessageHandlers.class) Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers, 092 final @Parameter(ServiceTaskFailedHandlers.class) Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers, 093 // Application-provided Evaluator event handlers 094 final @Parameter(EvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers, 095 final @Parameter(EvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers, 096 final @Parameter(EvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers, 097 // Service-provided Evaluator event handlers 098 final @Parameter(ServiceEvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers, 099 final @Parameter(ServiceEvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers, 100 final @Parameter(ServiceEvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers, 101 102 // Application event handlers specific to a Driver restart 103 final @Parameter(DriverRestartTaskRunningHandlers.class) Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers, 104 final @Parameter(DriverRestartContextActiveHandlers.class) Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers, 105 final @Parameter(DriverRestartCompletedHandlers.class) Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers, 106 107 // Service-provided event handlers specific to a Driver restart 108 final @Parameter(ServiceDriverRestartTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers, 109 final @Parameter(ServiceDriverRestartContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers, 110 final @Parameter(ServiceDriverRestartCompletedHandlers.class) Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers, 111 112 final @Parameter(EvaluatorDispatcherThreads.class) int numberOfThreads, 113 final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorIdentifier, 114 final DriverExceptionHandler driverExceptionHandler) { 115 116 this.serviceDispatcher = new DispatchingEStage(driverExceptionHandler, numberOfThreads, evaluatorIdentifier); 117 this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher); 118 this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher); 119 this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher); 120 121 { // Application Context event handlers 122 this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers); 123 this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers); 124 this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers); 125 this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers); 126 } 127 { // Service Context event handlers 128 this.serviceDispatcher.register(ActiveContext.class, serviceContextActiveHandlers); 129 this.serviceDispatcher.register(ClosedContext.class, serviceContextClosedHandlers); 130 this.serviceDispatcher.register(FailedContext.class, serviceContextFailedHandlers); 131 this.serviceDispatcher.register(ContextMessage.class, serviceContextMessageHandlers); 132 } 133 { // Application Task event handlers. 134 this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers); 135 this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers); 136 this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers); 137 this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers); 138 this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers); 139 } 140 { // Service Task event handlers 141 this.serviceDispatcher.register(RunningTask.class, serviceTaskRunningEventHandlers); 142 this.serviceDispatcher.register(CompletedTask.class, serviceTaskCompletedEventHandlers); 143 this.serviceDispatcher.register(SuspendedTask.class, serviceTaskSuspendedEventHandlers); 144 this.serviceDispatcher.register(TaskMessage.class, serviceTaskMessageEventHandlers); 145 this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers); 146 } 147 { // Application Evaluator event handlers 148 this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers); 149 this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers); 150 this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers); 151 } 152 { // Service Evaluator event handlers 153 this.serviceDispatcher.register(FailedEvaluator.class, serviceEvaluatorFailedHandlers); 154 this.serviceDispatcher.register(CompletedEvaluator.class, serviceEvaluatorCompletedHandlers); 155 this.serviceDispatcher.register(AllocatedEvaluator.class, serviceEvaluatorAllocatedEventHandlers); 156 } 157 158 // Application event handlers specific to a Driver restart 159 { 160 this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers); 161 this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers); 162 this.driverRestartApplicationDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers); 163 } 164 165 // Service event handlers specific to a Driver restart 166 { 167 this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers); 168 this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers); 169 this.driverRestartServiceDispatcher.register(DriverRestartCompleted.class, serviceDriverRestartCompletedHandlers); 170 } 171 LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'"); 172 } 173 174 public void onEvaluatorAllocated(final AllocatedEvaluator allocatedEvaluator) { 175 this.dispatch(AllocatedEvaluator.class, allocatedEvaluator); 176 } 177 178 public void onEvaluatorFailed(final FailedEvaluator failedEvaluator) { 179 this.dispatch(FailedEvaluator.class, failedEvaluator); 180 } 181 182 public void onEvaluatorCompleted(final CompletedEvaluator completedEvaluator) { 183 this.dispatch(CompletedEvaluator.class, completedEvaluator); 184 } 185 186 public void onTaskRunning(final RunningTask runningTask) { 187 this.dispatch(RunningTask.class, runningTask); 188 } 189 190 public void onTaskCompleted(final CompletedTask completedTask) { 191 this.dispatch(CompletedTask.class, completedTask); 192 } 193 194 public void onTaskSuspended(final SuspendedTask suspendedTask) { 195 this.dispatch(SuspendedTask.class, suspendedTask); 196 } 197 198 public void onTaskMessage(final TaskMessage taskMessage) { 199 this.dispatch(TaskMessage.class, taskMessage); 200 } 201 202 public void onTaskFailed(final FailedTask failedTask) { 203 this.dispatch(FailedTask.class, failedTask); 204 } 205 206 public void onContextActive(final ActiveContext activeContext) { 207 this.dispatch(ActiveContext.class, activeContext); 208 } 209 210 public void onContextClose(final ClosedContext closedContext) { 211 this.dispatch(ClosedContext.class, closedContext); 212 } 213 214 public void onContextFailed(final FailedContext failedContext) { 215 this.dispatch(FailedContext.class, failedContext); 216 } 217 218 public void onContextMessage(final ContextMessage contextMessage) { 219 this.dispatch(ContextMessage.class, contextMessage); 220 } 221 222 public void onDriverRestartTaskRunning(final RunningTask runningTask) { 223 this.dispatchForRestartedDriver(RunningTask.class, runningTask); 224 } 225 226 public void OnDriverRestartContextActive(final ActiveContext activeContext) { 227 this.dispatchForRestartedDriver(ActiveContext.class, activeContext); 228 } 229 230 public void OnDriverRestartCompleted(final DriverRestartCompleted restartCompleted) { 231 this.dispatchForRestartedDriver(DriverRestartCompleted.class, restartCompleted); 232 } 233 234 boolean isEmpty() { 235 return this.applicationDispatcher.isEmpty(); 236 } 237 238 private <T, U extends T> void dispatch(final Class<T> type, final U message) { 239 this.serviceDispatcher.onNext(type, message); 240 this.applicationDispatcher.onNext(type, message); 241 } 242 243 private <T, U extends T> void dispatchForRestartedDriver(final Class<T> type, final U message) { 244 this.driverRestartApplicationDispatcher.onNext(type, message); 245 this.driverRestartServiceDispatcher.onNext(type, message); 246 } 247}