001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ClosedContext; 023import org.apache.reef.driver.context.ContextMessage; 024import org.apache.reef.driver.context.FailedContext; 025import org.apache.reef.driver.evaluator.AllocatedEvaluator; 026import org.apache.reef.driver.evaluator.CompletedEvaluator; 027import org.apache.reef.driver.evaluator.FailedEvaluator; 028import org.apache.reef.driver.parameters.*; 029import org.apache.reef.driver.task.*; 030import org.apache.reef.runtime.common.driver.DriverExceptionHandler; 031import org.apache.reef.runtime.common.utils.DispatchingEStage; 032import org.apache.reef.tang.annotations.Parameter; 033import org.apache.reef.wake.EventHandler; 034 035import javax.inject.Inject; 036import java.util.HashSet; 037import java.util.Set; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Central dispatcher for all Evaluator related events. This exists once per Evaluator. 043 */ 044public final class EvaluatorMessageDispatcher implements AutoCloseable { 045 046 private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName()); 047 048 private final String evaluatorIdentifier; 049 050 /** 051 * Dispatcher used for application provided event handlers. 052 */ 053 private final DispatchingEStage applicationDispatcher; 054 055 /** 056 * Dispatcher used for service provided event handlers. 057 */ 058 private final DispatchingEStage serviceDispatcher; 059 060 /** 061 * Dispatcher used for application provided driver-restart specific event handlers. 062 */ 063 private final DispatchingEStage driverRestartApplicationDispatcher; 064 065 /** 066 * Dispatcher used for service provided driver-restart specific event handlers. 067 */ 068 private final DispatchingEStage driverRestartServiceDispatcher; 069 070 @Inject 071 private EvaluatorMessageDispatcher( 072 // Application-provided Context event handlers 073 @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers, 074 @Parameter(ContextClosedHandlers.class) final Set<EventHandler<ClosedContext>> contextClosedHandlers, 075 @Parameter(ContextFailedHandlers.class) final Set<EventHandler<FailedContext>> contextFailedHandlers, 076 @Parameter(ContextMessageHandlers.class) final Set<EventHandler<ContextMessage>> contextMessageHandlers, 077 // Service-provided Context event handlers 078 @Parameter(ServiceContextActiveHandlers.class) 079 final Set<EventHandler<ActiveContext>> serviceContextActiveHandlers, 080 @Parameter(ServiceContextClosedHandlers.class) 081 final Set<EventHandler<ClosedContext>> serviceContextClosedHandlers, 082 @Parameter(ServiceContextFailedHandlers.class) 083 final Set<EventHandler<FailedContext>> serviceContextFailedHandlers, 084 @Parameter(ServiceContextMessageHandlers.class) 085 final Set<EventHandler<ContextMessage>> serviceContextMessageHandlers, 086 // Application-provided Task event handlers 087 @Parameter(TaskRunningHandlers.class) final Set<EventHandler<RunningTask>> taskRunningHandlers, 088 @Parameter(TaskCompletedHandlers.class) final Set<EventHandler<CompletedTask>> taskCompletedHandlers, 089 @Parameter(TaskSuspendedHandlers.class) final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, 090 @Parameter(TaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> taskMessageEventHandlers, 091 @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> taskExceptionEventHandlers, 092 // Service-provided Task event handlers 093 @Parameter(ServiceTaskRunningHandlers.class) final Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers, 094 @Parameter(ServiceTaskCompletedHandlers.class) 095 final Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers, 096 @Parameter(ServiceTaskSuspendedHandlers.class) 097 final Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers, 098 @Parameter(ServiceTaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers, 099 @Parameter(ServiceTaskFailedHandlers.class) final Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers, 100 // Application-provided Evaluator event handlers 101 @Parameter(EvaluatorAllocatedHandlers.class) 102 final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers, 103 @Parameter(EvaluatorFailedHandlers.class) final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers, 104 @Parameter(EvaluatorCompletedHandlers.class) 105 final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers, 106 // Service-provided Evaluator event handlers 107 @Parameter(ServiceEvaluatorAllocatedHandlers.class) 108 final Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers, 109 @Parameter(ServiceEvaluatorFailedHandlers.class) 110 final Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers, 111 @Parameter(ServiceEvaluatorCompletedHandlers.class) 112 final Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers, 113 114 // Application event handlers specific to a Driver restart 115 @Parameter(DriverRestartTaskRunningHandlers.class) 116 final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers, 117 @Parameter(DriverRestartContextActiveHandlers.class) 118 final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers, 119 @Parameter(DriverRestartFailedEvaluatorHandlers.class) 120 final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedHandlers, 121 122 // Service-provided event handlers specific to a Driver restart 123 @Parameter(ServiceDriverRestartTaskRunningHandlers.class) 124 final Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers, 125 @Parameter(ServiceDriverRestartContextActiveHandlers.class) 126 final Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers, 127 @Parameter(ServiceDriverRestartFailedEvaluatorHandlers.class) 128 final Set<EventHandler<FailedEvaluator>> serviceDriverRestartFailedEvaluatorHandlers, 129 130 @Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads, 131 @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String evaluatorIdentifier, 132 final DriverExceptionHandler driverExceptionHandler, 133 final IdlenessCallbackEventHandlerFactory idlenessCallbackEventHandlerFactory) { 134 135 LOG.log(Level.FINER, "Creating message dispatcher for {0}", evaluatorIdentifier); 136 137 this.evaluatorIdentifier = evaluatorIdentifier; 138 this.serviceDispatcher = new DispatchingEStage( 139 driverExceptionHandler, numberOfThreads, "EvaluatorMessageDispatcher:" + evaluatorIdentifier); 140 141 this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher); 142 this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher); 143 this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher); 144 145 // Application Context event handlers 146 this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers); 147 this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers); 148 this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers); 149 this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers); 150 151 // Service Context event handlers 152 this.serviceDispatcher.register(ActiveContext.class, serviceContextActiveHandlers); 153 this.serviceDispatcher.register(ClosedContext.class, serviceContextClosedHandlers); 154 this.serviceDispatcher.register(FailedContext.class, serviceContextFailedHandlers); 155 this.serviceDispatcher.register(ContextMessage.class, serviceContextMessageHandlers); 156 157 // Application Task event handlers. 158 this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers); 159 this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers); 160 this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers); 161 this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers); 162 this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers); 163 164 // Service Task event handlers 165 this.serviceDispatcher.register(RunningTask.class, serviceTaskRunningEventHandlers); 166 this.serviceDispatcher.register(CompletedTask.class, serviceTaskCompletedEventHandlers); 167 this.serviceDispatcher.register(SuspendedTask.class, serviceTaskSuspendedEventHandlers); 168 this.serviceDispatcher.register(TaskMessage.class, serviceTaskMessageEventHandlers); 169 this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers); 170 171 // Application Evaluator event handlers 172 this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers); 173 174 // Service Evaluator event handlers 175 this.serviceDispatcher.register(FailedEvaluator.class, serviceEvaluatorFailedHandlers); 176 this.serviceDispatcher.register(CompletedEvaluator.class, serviceEvaluatorCompletedHandlers); 177 this.serviceDispatcher.register(AllocatedEvaluator.class, serviceEvaluatorAllocatedEventHandlers); 178 179 // Application event handlers specific to a Driver restart 180 this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers); 181 this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers); 182 183 final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedCallbackHandlers = new HashSet<>(); 184 for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : driverRestartEvaluatorFailedHandlers) { 185 driverRestartEvaluatorFailedCallbackHandlers.add( 186 idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler)); 187 } 188 189 this.driverRestartApplicationDispatcher.register( 190 FailedEvaluator.class, driverRestartEvaluatorFailedCallbackHandlers); 191 192 // Service event handlers specific to a Driver restart 193 this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers); 194 this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers); 195 this.driverRestartServiceDispatcher.register(FailedEvaluator.class, serviceDriverRestartFailedEvaluatorHandlers); 196 197 final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedCallbackHandlers = new HashSet<>(); 198 for (final EventHandler<CompletedEvaluator> evaluatorCompletedHandler : evaluatorCompletedHandlers) { 199 evaluatorCompletedCallbackHandlers.add( 200 idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorCompletedHandler)); 201 } 202 this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedCallbackHandlers); 203 204 final Set<EventHandler<FailedEvaluator>> evaluatorFailedCallbackHandlers = new HashSet<>(); 205 for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : evaluatorFailedHandlers) { 206 evaluatorFailedCallbackHandlers.add( 207 idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler)); 208 } 209 this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedCallbackHandlers); 210 211 LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'"); 212 } 213 214 public void onEvaluatorAllocated(final AllocatedEvaluator allocatedEvaluator) { 215 this.dispatch(AllocatedEvaluator.class, allocatedEvaluator); 216 } 217 218 public void onEvaluatorFailed(final FailedEvaluator failedEvaluator) { 219 this.dispatch(FailedEvaluator.class, failedEvaluator); 220 } 221 222 public void onEvaluatorCompleted(final CompletedEvaluator completedEvaluator) { 223 this.dispatch(CompletedEvaluator.class, completedEvaluator); 224 } 225 226 public void onTaskRunning(final RunningTask runningTask) { 227 this.dispatch(RunningTask.class, runningTask); 228 } 229 230 public void onTaskCompleted(final CompletedTask completedTask) { 231 this.dispatch(CompletedTask.class, completedTask); 232 } 233 234 public void onTaskSuspended(final SuspendedTask suspendedTask) { 235 this.dispatch(SuspendedTask.class, suspendedTask); 236 } 237 238 public void onTaskMessage(final TaskMessage taskMessage) { 239 this.dispatch(TaskMessage.class, taskMessage); 240 } 241 242 public void onTaskFailed(final FailedTask failedTask) { 243 this.dispatch(FailedTask.class, failedTask); 244 } 245 246 public void onContextActive(final ActiveContext activeContext) { 247 this.dispatch(ActiveContext.class, activeContext); 248 } 249 250 public void onContextClose(final ClosedContext closedContext) { 251 this.dispatch(ClosedContext.class, closedContext); 252 } 253 254 public void onContextFailed(final FailedContext failedContext) { 255 this.dispatch(FailedContext.class, failedContext); 256 } 257 258 public void onContextMessage(final ContextMessage contextMessage) { 259 this.dispatch(ContextMessage.class, contextMessage); 260 } 261 262 public void onDriverRestartTaskRunning(final RunningTask runningTask) { 263 this.dispatchForRestartedDriver(RunningTask.class, runningTask); 264 } 265 266 public void onDriverRestartContextActive(final ActiveContext activeContext) { 267 this.dispatchForRestartedDriver(ActiveContext.class, activeContext); 268 } 269 270 public void onDriverRestartEvaluatorFailed(final FailedEvaluator failedEvaluator) { 271 this.dispatchForRestartedDriver(FailedEvaluator.class, failedEvaluator); 272 } 273 274 boolean isEmpty() { 275 return this.applicationDispatcher.isEmpty(); 276 } 277 278 private <T, U extends T> void dispatch(final Class<T> type, final U message) { 279 this.serviceDispatcher.onNext(type, message); 280 this.applicationDispatcher.onNext(type, message); 281 } 282 283 private <T, U extends T> void dispatchForRestartedDriver(final Class<T> type, final U message) { 284 this.driverRestartServiceDispatcher.onNext(type, message); 285 this.driverRestartApplicationDispatcher.onNext(type, message); 286 } 287 288 @Override 289 public void close() { 290 LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier); 291 // This effectively closes all dispatchers as they share the same stage. 292 this.serviceDispatcher.close(); 293 if (!this.serviceDispatcher.isClosed()) { 294 LOG.log(Level.SEVERE, 295 "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close", 296 this.evaluatorIdentifier); 297 } 298 } 299}