001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ClosedContext; 023import org.apache.reef.driver.context.ContextMessage; 024import org.apache.reef.driver.context.FailedContext; 025import org.apache.reef.driver.evaluator.AllocatedEvaluator; 026import org.apache.reef.driver.evaluator.CompletedEvaluator; 027import org.apache.reef.driver.evaluator.FailedEvaluator; 028import org.apache.reef.driver.parameters.*; 029import org.apache.reef.driver.task.*; 030import org.apache.reef.runtime.common.driver.DriverExceptionHandler; 031import org.apache.reef.runtime.common.utils.DispatchingEStage; 032import org.apache.reef.tang.annotations.Parameter; 033import org.apache.reef.wake.EventHandler; 034 035import javax.inject.Inject; 036import java.util.HashSet; 037import java.util.Set; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Central dispatcher for all Evaluator related events. This exists once per Evaluator. 043 */ 044public final class EvaluatorMessageDispatcher implements AutoCloseable { 045 046 private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName()); 047 048 /** 049 * Dispatcher used for application provided event handlers. 050 */ 051 private final DispatchingEStage applicationDispatcher; 052 053 /** 054 * Dispatcher used for service provided event handlers. 055 */ 056 private final DispatchingEStage serviceDispatcher; 057 058 059 /** 060 * Dispatcher used for application provided driver-restart specific event handlers. 061 */ 062 private final DispatchingEStage driverRestartApplicationDispatcher; 063 064 /** 065 * Dispatcher used for service provided driver-restart specific event handlers. 066 */ 067 private final DispatchingEStage driverRestartServiceDispatcher; 068 069 @Inject 070 EvaluatorMessageDispatcher( 071 // Application-provided Context event handlers 072 @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers, 073 @Parameter(ContextClosedHandlers.class) final Set<EventHandler<ClosedContext>> contextClosedHandlers, 074 @Parameter(ContextFailedHandlers.class) final Set<EventHandler<FailedContext>> contextFailedHandlers, 075 @Parameter(ContextMessageHandlers.class) final Set<EventHandler<ContextMessage>> contextMessageHandlers, 076 // Service-provided Context event handlers 077 @Parameter(ServiceContextActiveHandlers.class) 078 final Set<EventHandler<ActiveContext>> serviceContextActiveHandlers, 079 @Parameter(ServiceContextClosedHandlers.class) 080 final Set<EventHandler<ClosedContext>> serviceContextClosedHandlers, 081 @Parameter(ServiceContextFailedHandlers.class) 082 final Set<EventHandler<FailedContext>> serviceContextFailedHandlers, 083 @Parameter(ServiceContextMessageHandlers.class) 084 final Set<EventHandler<ContextMessage>> serviceContextMessageHandlers, 085 // Application-provided Task event handlers 086 @Parameter(TaskRunningHandlers.class) final Set<EventHandler<RunningTask>> taskRunningHandlers, 087 @Parameter(TaskCompletedHandlers.class) final Set<EventHandler<CompletedTask>> taskCompletedHandlers, 088 @Parameter(TaskSuspendedHandlers.class) final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, 089 @Parameter(TaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> taskMessageEventHandlers, 090 @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> taskExceptionEventHandlers, 091 // Service-provided Task event handlers 092 @Parameter(ServiceTaskRunningHandlers.class) final Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers, 093 @Parameter(ServiceTaskCompletedHandlers.class) 094 final Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers, 095 @Parameter(ServiceTaskSuspendedHandlers.class) 096 final Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers, 097 @Parameter(ServiceTaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers, 098 @Parameter(ServiceTaskFailedHandlers.class) final Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers, 099 // Application-provided Evaluator event handlers 100 @Parameter(EvaluatorAllocatedHandlers.class) 101 final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers, 102 @Parameter(EvaluatorFailedHandlers.class) final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers, 103 @Parameter(EvaluatorCompletedHandlers.class) 104 final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers, 105 // Service-provided Evaluator event handlers 106 @Parameter(ServiceEvaluatorAllocatedHandlers.class) 107 final Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers, 108 @Parameter(ServiceEvaluatorFailedHandlers.class) 109 final Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers, 110 @Parameter(ServiceEvaluatorCompletedHandlers.class) 111 final Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers, 112 113 // Application event handlers specific to a Driver restart 114 @Parameter(DriverRestartTaskRunningHandlers.class) 115 final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers, 116 @Parameter(DriverRestartContextActiveHandlers.class) 117 final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers, 118 @Parameter(DriverRestartFailedEvaluatorHandlers.class) 119 final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedHandlers, 120 121 // Service-provided event handlers specific to a Driver restart 122 @Parameter(ServiceDriverRestartTaskRunningHandlers.class) 123 final Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers, 124 @Parameter(ServiceDriverRestartContextActiveHandlers.class) 125 final Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers, 126 @Parameter(ServiceDriverRestartFailedEvaluatorHandlers.class) 127 final Set<EventHandler<FailedEvaluator>> serviceDriverRestartFailedEvaluatorHandlers, 128 129 @Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads, 130 @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String evaluatorIdentifier, 131 final DriverExceptionHandler driverExceptionHandler, 132 final IdlenessCallbackEventHandlerFactory idlenessCallbackEventHandlerFactory 133 ) { 134 135 this.serviceDispatcher = new DispatchingEStage(driverExceptionHandler, numberOfThreads, evaluatorIdentifier); 136 this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher); 137 this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher); 138 this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher); 139 140 // Application Context event handlers 141 this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers); 142 this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers); 143 this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers); 144 this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers); 145 146 // Service Context event handlers 147 this.serviceDispatcher.register(ActiveContext.class, serviceContextActiveHandlers); 148 this.serviceDispatcher.register(ClosedContext.class, serviceContextClosedHandlers); 149 this.serviceDispatcher.register(FailedContext.class, serviceContextFailedHandlers); 150 this.serviceDispatcher.register(ContextMessage.class, serviceContextMessageHandlers); 151 152 // Application Task event handlers. 153 this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers); 154 this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers); 155 this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers); 156 this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers); 157 this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers); 158 159 // Service Task event handlers 160 this.serviceDispatcher.register(RunningTask.class, serviceTaskRunningEventHandlers); 161 this.serviceDispatcher.register(CompletedTask.class, serviceTaskCompletedEventHandlers); 162 this.serviceDispatcher.register(SuspendedTask.class, serviceTaskSuspendedEventHandlers); 163 this.serviceDispatcher.register(TaskMessage.class, serviceTaskMessageEventHandlers); 164 this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers); 165 166 // Application Evaluator event handlers 167 this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers); 168 169 // Service Evaluator event handlers 170 this.serviceDispatcher.register(FailedEvaluator.class, serviceEvaluatorFailedHandlers); 171 this.serviceDispatcher.register(CompletedEvaluator.class, serviceEvaluatorCompletedHandlers); 172 this.serviceDispatcher.register(AllocatedEvaluator.class, serviceEvaluatorAllocatedEventHandlers); 173 174 // Application event handlers specific to a Driver restart 175 this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers); 176 this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers); 177 178 final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedCallbackHandlers = new HashSet<>(); 179 for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : driverRestartEvaluatorFailedHandlers) { 180 driverRestartEvaluatorFailedCallbackHandlers.add( 181 idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler)); 182 } 183 184 this.driverRestartApplicationDispatcher.register( 185 FailedEvaluator.class, driverRestartEvaluatorFailedCallbackHandlers); 186 187 // Service event handlers specific to a Driver restart 188 this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers); 189 this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers); 190 this.driverRestartServiceDispatcher.register(FailedEvaluator.class, serviceDriverRestartFailedEvaluatorHandlers); 191 192 final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedCallbackHandlers = new HashSet<>(); 193 for (final EventHandler<CompletedEvaluator> evaluatorCompletedHandler : evaluatorCompletedHandlers) { 194 evaluatorCompletedCallbackHandlers.add( 195 idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorCompletedHandler)); 196 } 197 this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedCallbackHandlers); 198 199 final Set<EventHandler<FailedEvaluator>> evaluatorFailedCallbackHandlers = new HashSet<>(); 200 for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : evaluatorFailedHandlers) { 201 evaluatorFailedCallbackHandlers.add( 202 idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler)); 203 } 204 this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedCallbackHandlers); 205 206 LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'"); 207 } 208 209 public void onEvaluatorAllocated(final AllocatedEvaluator allocatedEvaluator) { 210 this.dispatch(AllocatedEvaluator.class, allocatedEvaluator); 211 } 212 213 public void onEvaluatorFailed(final FailedEvaluator failedEvaluator) { 214 this.dispatch(FailedEvaluator.class, failedEvaluator); 215 } 216 217 public void onEvaluatorCompleted(final CompletedEvaluator completedEvaluator) { 218 this.dispatch(CompletedEvaluator.class, completedEvaluator); 219 } 220 221 public void onTaskRunning(final RunningTask runningTask) { 222 this.dispatch(RunningTask.class, runningTask); 223 } 224 225 public void onTaskCompleted(final CompletedTask completedTask) { 226 this.dispatch(CompletedTask.class, completedTask); 227 } 228 229 public void onTaskSuspended(final SuspendedTask suspendedTask) { 230 this.dispatch(SuspendedTask.class, suspendedTask); 231 } 232 233 public void onTaskMessage(final TaskMessage taskMessage) { 234 this.dispatch(TaskMessage.class, taskMessage); 235 } 236 237 public void onTaskFailed(final FailedTask failedTask) { 238 this.dispatch(FailedTask.class, failedTask); 239 } 240 241 public void onContextActive(final ActiveContext activeContext) { 242 this.dispatch(ActiveContext.class, activeContext); 243 } 244 245 public void onContextClose(final ClosedContext closedContext) { 246 this.dispatch(ClosedContext.class, closedContext); 247 } 248 249 public void onContextFailed(final FailedContext failedContext) { 250 this.dispatch(FailedContext.class, failedContext); 251 } 252 253 public void onContextMessage(final ContextMessage contextMessage) { 254 this.dispatch(ContextMessage.class, contextMessage); 255 } 256 257 public void onDriverRestartTaskRunning(final RunningTask runningTask) { 258 this.dispatchForRestartedDriver(RunningTask.class, runningTask); 259 } 260 261 public void onDriverRestartContextActive(final ActiveContext activeContext) { 262 this.dispatchForRestartedDriver(ActiveContext.class, activeContext); 263 } 264 265 public void onDriverRestartEvaluatorFailed(final FailedEvaluator failedEvaluator) { 266 this.dispatchForRestartedDriver(FailedEvaluator.class, failedEvaluator); 267 } 268 269 boolean isEmpty() { 270 return this.applicationDispatcher.isEmpty(); 271 } 272 273 private <T, U extends T> void dispatch(final Class<T> type, final U message) { 274 this.serviceDispatcher.onNext(type, message); 275 this.applicationDispatcher.onNext(type, message); 276 } 277 278 private <T, U extends T> void dispatchForRestartedDriver(final Class<T> type, final U message) { 279 this.driverRestartServiceDispatcher.onNext(type, message); 280 this.driverRestartApplicationDispatcher.onNext(type, message); 281 } 282 283 @Override 284 public void close() throws Exception { 285 /** 286 * This effectively closes all dispatchers as they share the same stage. 287 */ 288 this.serviceDispatcher.close(); 289 } 290}