001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.context.ActiveContext; 024import org.apache.reef.driver.context.FailedContext; 025import org.apache.reef.driver.evaluator.AllocatedEvaluator; 026import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 027import org.apache.reef.driver.evaluator.EvaluatorType; 028import org.apache.reef.driver.task.FailedTask; 029import org.apache.reef.exception.EvaluatorException; 030import org.apache.reef.exception.EvaluatorKilledByResourceManagerException; 031import org.apache.reef.io.naming.Identifiable; 032import org.apache.reef.proto.DriverRuntimeProtocol; 033import org.apache.reef.proto.EvaluatorRuntimeProtocol; 034import org.apache.reef.proto.ReefServiceProtos; 035import org.apache.reef.runtime.common.DriverRestartCompleted; 036import org.apache.reef.runtime.common.driver.DriverStatusManager; 037import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; 038import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; 039import org.apache.reef.runtime.common.driver.context.ContextControlHandler; 040import org.apache.reef.runtime.common.driver.context.ContextRepresenters; 041import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource; 042import org.apache.reef.runtime.common.driver.task.TaskRepresenter; 043import org.apache.reef.runtime.common.utils.ExceptionCodec; 044import org.apache.reef.runtime.common.utils.RemoteManager; 045import org.apache.reef.tang.annotations.Name; 046import org.apache.reef.tang.annotations.NamedParameter; 047import org.apache.reef.tang.annotations.Parameter; 048import org.apache.reef.tang.formats.ConfigurationSerializer; 049import org.apache.reef.util.Optional; 050import org.apache.reef.util.logging.LoggingScopeFactory; 051import org.apache.reef.wake.EventHandler; 052import org.apache.reef.wake.remote.RemoteMessage; 053import org.apache.reef.wake.time.Clock; 054import org.apache.reef.wake.time.event.Alarm; 055 056import javax.inject.Inject; 057import java.io.File; 058import java.util.List; 059import java.util.logging.Level; 060import java.util.logging.Logger; 061 062/** 063 * Manages a single Evaluator instance including all lifecycle instances: 064 * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). 065 * <p/> 066 * A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager. 067 * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this 068 * heartbeat channel. 069 * <p/> 070 * A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime. 071 * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate 072 * control information (e.g., shutdown, suspend). 073 */ 074@Private 075@DriverSide 076public final class EvaluatorManager implements Identifiable, AutoCloseable { 077 078 private final static Logger LOG = Logger.getLogger(EvaluatorManager.class.getName()); 079 080 private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker(); 081 private final Clock clock; 082 private final ResourceReleaseHandler resourceReleaseHandler; 083 private final ResourceLaunchHandler resourceLaunchHandler; 084 private final String evaluatorId; 085 private final EvaluatorDescriptorImpl evaluatorDescriptor; 086 private final ContextRepresenters contextRepresenters; 087 private final EvaluatorMessageDispatcher messageDispatcher; 088 private final EvaluatorControlHandler evaluatorControlHandler; 089 private final ContextControlHandler contextControlHandler; 090 private final EvaluatorStatusManager stateManager; 091 private final ExceptionCodec exceptionCodec; 092 private final DriverStatusManager driverStatusManager; 093 private final EventHandlerIdlenessSource idlenessSource; 094 private final LoggingScopeFactory loggingScopeFactory; 095 096 097 // Mutable fields 098 private Optional<TaskRepresenter> task = Optional.empty(); 099 private boolean isResourceReleased = false; 100 101 @Inject 102 private EvaluatorManager( 103 final Clock clock, 104 final RemoteManager remoteManager, 105 final ResourceReleaseHandler resourceReleaseHandler, 106 final ResourceLaunchHandler resourceLaunchHandler, 107 final @Parameter(EvaluatorIdentifier.class) String evaluatorId, 108 final @Parameter(EvaluatorDescriptorName.class) EvaluatorDescriptorImpl evaluatorDescriptor, 109 final ContextRepresenters contextRepresenters, 110 final ConfigurationSerializer configurationSerializer, 111 final EvaluatorMessageDispatcher messageDispatcher, 112 final EvaluatorControlHandler evaluatorControlHandler, 113 final ContextControlHandler contextControlHandler, 114 final EvaluatorStatusManager stateManager, 115 final DriverStatusManager driverStatusManager, 116 final ExceptionCodec exceptionCodec, 117 final EventHandlerIdlenessSource idlenessSource, 118 final LoggingScopeFactory loggingScopeFactory) { 119 this.contextRepresenters = contextRepresenters; 120 this.idlenessSource = idlenessSource; 121 LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId); 122 this.clock = clock; 123 this.resourceReleaseHandler = resourceReleaseHandler; 124 this.resourceLaunchHandler = resourceLaunchHandler; 125 this.evaluatorId = evaluatorId; 126 this.evaluatorDescriptor = evaluatorDescriptor; 127 128 this.messageDispatcher = messageDispatcher; 129 this.evaluatorControlHandler = evaluatorControlHandler; 130 this.contextControlHandler = contextControlHandler; 131 this.stateManager = stateManager; 132 this.driverStatusManager = driverStatusManager; 133 this.exceptionCodec = exceptionCodec; 134 this.loggingScopeFactory = loggingScopeFactory; 135 136 final AllocatedEvaluator allocatedEvaluator = 137 new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier(), configurationSerializer, getJobIdentifier(), loggingScopeFactory); 138 LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId); 139 this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator); 140 LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId()); 141 } 142 143 /** 144 * Get the id of current job/application 145 */ 146 public static String getJobIdentifier() { 147 // TODO: currently we obtain the job id directly by parsing execution (container) directory path 148 // #845 is open to get the id from RM properly 149 for (File directory = new File(System.getProperty("user.dir")); 150 directory != null; directory = directory.getParentFile()) { 151 final String currentDirectoryName = directory.getName(); 152 if (currentDirectoryName.toLowerCase().contains("application_")) { 153 return currentDirectoryName; 154 } 155 } 156 // cannot find a directory that contains application_, presumably we are on local runtime 157 // again, this is a hack for now, we need #845 as a proper solution 158 return "REEF_LOCAL_RUNTIME"; 159 } 160 161 private static boolean isDoneOrFailedOrKilled(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) { 162 return resourceStatusProto.getState() == ReefServiceProtos.State.DONE || 163 resourceStatusProto.getState() == ReefServiceProtos.State.FAILED || 164 resourceStatusProto.getState() == ReefServiceProtos.State.KILLED; 165 } 166 167 @Override 168 public String getId() { 169 return this.evaluatorId; 170 } 171 172 public void setType(final EvaluatorType type) { 173 this.evaluatorDescriptor.setType(type); 174 } 175 176 public EvaluatorDescriptor getEvaluatorDescriptor() { 177 return this.evaluatorDescriptor; 178 } 179 180 @Override 181 public void close() { 182 synchronized (this.evaluatorDescriptor) { 183 if (this.stateManager.isRunning()) { 184 LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId()); 185 try { 186 // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. 187 final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto = 188 EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() 189 .setTimestamp(System.currentTimeMillis()) 190 .setIdentifier(getId()) 191 .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build()) 192 .build(); 193 sendEvaluatorControlMessage(evaluatorControlProto); 194 } finally { 195 this.stateManager.setKilled(); 196 } 197 } 198 199 200 if (!this.isResourceReleased) { 201 this.isResourceReleased = true; 202 try { 203 /* We need to wait awhile before returning the container to the RM in order to 204 * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */ 205 this.clock.scheduleAlarm(100, new EventHandler<Alarm>() { 206 @Override 207 public void onNext(final Alarm alarm) { 208 EvaluatorManager.this.resourceReleaseHandler.onNext( 209 DriverRuntimeProtocol.ResourceReleaseProto.newBuilder() 210 .setIdentifier(EvaluatorManager.this.evaluatorId).build() 211 ); 212 } 213 }); 214 } catch (final IllegalStateException e) { 215 LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e); 216 EvaluatorManager.this.resourceReleaseHandler.onNext( 217 DriverRuntimeProtocol.ResourceReleaseProto.newBuilder() 218 .setIdentifier(EvaluatorManager.this.evaluatorId).build() 219 ); 220 } 221 } 222 } 223 this.idlenessSource.check(); 224 } 225 226 /** 227 * Return true if the state is DONE, FAILED, or KILLED, 228 * <em>and</em> there are no messages queued or in processing. 229 */ 230 public boolean isClosed() { 231 return this.messageDispatcher.isEmpty() && 232 (this.stateManager.isDoneOrFailedOrKilled()); 233 } 234 235 /** 236 * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED 237 * 238 * @param exception on the EvaluatorRuntime 239 */ 240 public void onEvaluatorException(final EvaluatorException exception) { 241 synchronized (this.evaluatorDescriptor) { 242 if (this.stateManager.isDoneOrFailedOrKilled()) { 243 LOG.log(Level.FINE, "Ignoring an exception receivedfor Evaluator {0} which is already in state {1}.", 244 new Object[]{this.getId(), this.stateManager}); 245 return; 246 } 247 248 LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception); 249 250 try { 251 252 final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure(); 253 254 final Optional<FailedTask> failedTaskOptional; 255 if (this.task.isPresent()) { 256 final String taskId = this.task.get().getId(); 257 final Optional<ActiveContext> evaluatorContext = Optional.empty(); 258 final Optional<byte[]> bytes = Optional.empty(); 259 final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash")); 260 final String message = "Evaluator crash"; 261 final Optional<String> description = Optional.empty(); 262 final FailedTask failedTask = new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext); 263 failedTaskOptional = Optional.of(failedTask); 264 } else { 265 failedTaskOptional = Optional.empty(); 266 } 267 268 269 this.messageDispatcher.onEvaluatorFailed(new FailedEvaluatorImpl(exception, failedContextList, failedTaskOptional, this.evaluatorId)); 270 271 } catch (final Exception e) { 272 LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e); 273 } finally { 274 this.stateManager.setFailed(); 275 close(); 276 } 277 } 278 } 279 280 public synchronized void onEvaluatorHeartbeatMessage( 281 final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) { 282 283 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto = 284 evaluatorHeartbeatProtoRemoteMessage.getMessage(); 285 LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto); 286 287 if (this.stateManager.isDoneOrFailedOrKilled()) { 288 LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.", 289 new Object[]{this.getId(), this.stateManager}); 290 return; 291 } 292 293 this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp()); 294 final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); 295 296 // first message from a running evaluator trying to re-establish communications 297 if (evaluatorHeartbeatProto.getRecovery()) { 298 this.evaluatorControlHandler.setRemoteID(evaluatorRID); 299 this.stateManager.setRunning(); 300 301 this.driverStatusManager.oneContainerRecovered(); 302 final int numRecoveredContainers = this.driverStatusManager.getNumRecoveredContainers(); 303 304 LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId); 305 final int expectedEvaluatorsNumber = this.driverStatusManager.getNumPreviousContainers(); 306 307 if (numRecoveredContainers > expectedEvaluatorsNumber) { 308 LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, but [{1}] evaluators have checked in.", 309 new Object[]{expectedEvaluatorsNumber, numRecoveredContainers}); 310 throw new RuntimeException("More then expected number of evaluators are checking in during recovery."); 311 } else if (numRecoveredContainers == expectedEvaluatorsNumber) { 312 LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. Recovery completed.", expectedEvaluatorsNumber); 313 this.driverStatusManager.setRestartCompleted(); 314 this.messageDispatcher.OnDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis())); 315 } else { 316 LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] evaluators have checked in.", 317 new Object[]{expectedEvaluatorsNumber, numRecoveredContainers}); 318 } 319 } 320 321 // If this is the first message from this Evaluator, register it. 322 if (this.stateManager.isSubmitted()) { 323 this.evaluatorControlHandler.setRemoteID(evaluatorRID); 324 this.stateManager.setRunning(); 325 LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId); 326 } 327 328 // Process the Evaluator status message 329 if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { 330 this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus()); 331 } 332 333 // Process the Context status message(s) 334 final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus(); 335 this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(), 336 informClientOfNewContexts); 337 338 // Process the Task status message 339 if (evaluatorHeartbeatProto.hasTaskStatus()) { 340 this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus()); 341 } 342 LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); 343 } 344 345 /** 346 * Process a evaluator status message. 347 * 348 * @param message 349 */ 350 private synchronized void onEvaluatorStatusMessage(final ReefServiceProtos.EvaluatorStatusProto message) { 351 352 switch (message.getState()) { 353 case DONE: 354 this.onEvaluatorDone(message); 355 break; 356 case FAILED: 357 this.onEvaluatorFailed(message); 358 break; 359 case INIT: 360 case KILLED: 361 case RUNNING: 362 case SUSPEND: 363 break; 364 } 365 } 366 367 /** 368 * Process an evaluator message that indicates that the evaluator shut down cleanly. 369 * 370 * @param message 371 */ 372 private synchronized void onEvaluatorDone(final ReefServiceProtos.EvaluatorStatusProto message) { 373 assert (message.getState() == ReefServiceProtos.State.DONE); 374 LOG.log(Level.FINEST, "Evaluator {0} done.", getId()); 375 this.stateManager.setDone(); 376 this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId)); 377 close(); 378 } 379 380 /** 381 * Process an evaluator message that indicates a crash. 382 * 383 * @param evaluatorStatusProto 384 */ 385 private synchronized void onEvaluatorFailed(final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) { 386 assert (evaluatorStatusProto.getState() == ReefServiceProtos.State.FAILED); 387 final EvaluatorException evaluatorException; 388 if (evaluatorStatusProto.hasError()) { 389 final Optional<Throwable> exception = this.exceptionCodec.fromBytes(evaluatorStatusProto.getError().toByteArray()); 390 if (exception.isPresent()) { 391 evaluatorException = new EvaluatorException(getId(), exception.get()); 392 } else { 393 evaluatorException = new EvaluatorException(getId(), new Exception("Exception sent, but can't be deserialized")); 394 } 395 } else { 396 evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent")); 397 } 398 onEvaluatorException(evaluatorException); 399 } 400 401 public void onResourceLaunch(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) { 402 synchronized (this.evaluatorDescriptor) { 403 if (this.stateManager.isAllocated()) { 404 this.stateManager.setSubmitted(); 405 this.resourceLaunchHandler.onNext(resourceLaunchProto); 406 } else { 407 throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED + 408 " state but instead is in state " + this.stateManager); 409 } 410 } 411 } 412 413 /** 414 * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime 415 * 416 * @param contextControlProto message contains context control info. 417 */ 418 public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) { 419 synchronized (this.evaluatorDescriptor) { 420 LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId); 421 this.contextControlHandler.send(contextControlProto); 422 } 423 } 424 425 /** 426 * Forward the EvaluatorControlProto to the EvaluatorRuntime 427 * 428 * @param evaluatorControlProto message contains evaluator control information. 429 */ 430 void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) { 431 synchronized (this.evaluatorDescriptor) { 432 this.evaluatorControlHandler.send(evaluatorControlProto); 433 } 434 } 435 436 /** 437 * Handle task status messages. 438 * 439 * @param taskStatusProto message contains the current task status. 440 */ 441 private void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 442 443 if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatusProto.getTaskId()))) { 444 if (taskStatusProto.getState() == ReefServiceProtos.State.INIT || 445 taskStatusProto.getState() == ReefServiceProtos.State.FAILED || 446 taskStatusProto.getRecovery() // for task from recovered evaluators 447 ) { 448 449 // FAILED is a legal first state of a Task as it could have failed during construction. 450 this.task = Optional.of( 451 new TaskRepresenter(taskStatusProto.getTaskId(), 452 this.contextRepresenters.getContext(taskStatusProto.getContextId()), 453 this.messageDispatcher, 454 this, 455 this.exceptionCodec)); 456 } else { 457 throw new RuntimeException("Received an message of state " + taskStatusProto.getState() + 458 ", not INIT or FAILED for Task " + taskStatusProto.getTaskId() + " which we haven't heard from before."); 459 } 460 } 461 this.task.get().onTaskStatusMessage(taskStatusProto); 462 463 if (this.task.get().isNotRunning()) { 464 LOG.log(Level.FINEST, "Task no longer running. De-registering it."); 465 this.task = Optional.empty(); 466 } 467 } 468 469 /** 470 * Resource status information from the (actual) resource manager. 471 */ 472 public void onResourceStatusMessage(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) { 473 synchronized (this.evaluatorDescriptor) { 474 LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusProto.getState()); 475 if (this.stateManager.isDoneOrFailedOrKilled()) { 476 LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.", 477 new Object[]{this.getId(), this.stateManager}); 478 } else if (isDoneOrFailedOrKilled(resourceStatusProto) && this.stateManager.isAllocatedOrSubmittedOrRunning()) { 479 // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes 480 // it to be alive. 481 final StringBuilder messageBuilder = new StringBuilder("Evaluator [") 482 .append(this.evaluatorId) 483 .append("] is assumed to be in state [") 484 .append(this.stateManager.toString()) 485 .append("]. But the resource manager reports it to be in state [") 486 .append(resourceStatusProto.getState()) 487 .append("]."); 488 489 if (this.stateManager.isSubmitted()) { 490 messageBuilder 491 .append(" This most likely means that the Evaluator suffered a failure before establishing a communications link to the driver."); 492 } else if (this.stateManager.isAllocated()) { 493 messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used."); 494 } else if (this.stateManager.isRunning()) { 495 messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message back to the driver."); 496 } 497 if (this.task.isPresent()) { 498 messageBuilder.append(" Task [") 499 .append(this.task.get().getId()) 500 .append("] was running when the Evaluator crashed."); 501 } 502 this.isResourceReleased = true; 503 504 if (resourceStatusProto.getState() == ReefServiceProtos.State.KILLED) { 505 this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString())); 506 } else { 507 this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString())); 508 } 509 } 510 } 511 } 512 513 @Override 514 public String toString() { 515 return "EvaluatorManager:" 516 + " id=" + this.evaluatorId 517 + " state=" + this.stateManager 518 + " task=" + this.task; 519 } 520 521 // Dynamic Parameters 522 @NamedParameter(doc = "The Evaluator Identifier.") 523 public final static class EvaluatorIdentifier implements Name<String> { 524 } 525 526 @NamedParameter(doc = "The Evaluator Host.") 527 public final static class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> { 528 } 529}