001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.evaluator.FailedEvaluator; 024import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders; 025import org.apache.reef.driver.restart.DriverRestartManager; 026import org.apache.reef.driver.restart.EvaluatorRestartState; 027import org.apache.reef.exception.NonSerializableException; 028import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; 029import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO; 030import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 031import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO; 032import org.apache.reef.tang.ConfigurationProvider; 033import org.apache.reef.driver.context.ActiveContext; 034import org.apache.reef.driver.context.FailedContext; 035import org.apache.reef.driver.evaluator.AllocatedEvaluator; 036import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 037import org.apache.reef.driver.task.FailedTask; 038import org.apache.reef.exception.EvaluatorException; 039import org.apache.reef.exception.EvaluatorKilledByResourceManagerException; 040import org.apache.reef.io.naming.Identifiable; 041import org.apache.reef.proto.EvaluatorRuntimeProtocol; 042import org.apache.reef.proto.ReefServiceProtos; 043import org.apache.reef.driver.evaluator.EvaluatorProcess; 044import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; 045import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl; 046import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; 047import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; 048import org.apache.reef.runtime.common.driver.context.ContextControlHandler; 049import org.apache.reef.runtime.common.driver.context.ContextRepresenters; 050import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource; 051import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; 052import org.apache.reef.runtime.common.driver.task.TaskRepresenter; 053import org.apache.reef.runtime.common.utils.ExceptionCodec; 054import org.apache.reef.runtime.common.utils.RemoteManager; 055import org.apache.reef.tang.annotations.Name; 056import org.apache.reef.tang.annotations.NamedParameter; 057import org.apache.reef.tang.annotations.Parameter; 058import org.apache.reef.tang.formats.ConfigurationSerializer; 059import org.apache.reef.util.Optional; 060import org.apache.reef.util.logging.LoggingScopeFactory; 061import org.apache.reef.wake.EventHandler; 062import org.apache.reef.wake.remote.RemoteMessage; 063import org.apache.reef.wake.time.Clock; 064import org.apache.reef.wake.time.event.Alarm; 065 066import javax.inject.Inject; 067import java.io.File; 068import java.util.ArrayList; 069import java.util.List; 070import java.util.Set; 071import java.util.logging.Level; 072import java.util.logging.Logger; 073 074/** 075 * Manages a single Evaluator instance including all lifecycle instances: 076 * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). 077 * <p> 078 * A (periodic) heartbeat channel is established from EvaluatorRuntime to EvaluatorManager. 079 * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this 080 * heartbeat channel. 081 * <p> 082 * A (push-based) EventHandler channel is established from EvaluatorManager to EvaluatorRuntime. 083 * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate 084 * control information (e.g., shutdown, suspend). 085 */ 086@Private 087@DriverSide 088public final class EvaluatorManager implements Identifiable, AutoCloseable { 089 090 private static final Logger LOG = Logger.getLogger(EvaluatorManager.class.getName()); 091 092 private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker(); 093 private final Clock clock; 094 private final ResourceReleaseHandler resourceReleaseHandler; 095 private final ResourceLaunchHandler resourceLaunchHandler; 096 private final String evaluatorId; 097 private final EvaluatorDescriptorImpl evaluatorDescriptor; 098 private final ContextRepresenters contextRepresenters; 099 private final EvaluatorMessageDispatcher messageDispatcher; 100 private final EvaluatorControlHandler evaluatorControlHandler; 101 private final ContextControlHandler contextControlHandler; 102 private final EvaluatorStatusManager stateManager; 103 private final ExceptionCodec exceptionCodec; 104 private final EventHandlerIdlenessSource idlenessSource; 105 private final RemoteManager remoteManager; 106 private final ConfigurationSerializer configurationSerializer; 107 private final LoggingScopeFactory loggingScopeFactory; 108 private final Set<ConfigurationProvider> evaluatorConfigurationProviders; 109 private final DriverRestartManager driverRestartManager; 110 private final EvaluatorIdlenessThreadPool idlenessThreadPool; 111 112 // Mutable fields 113 private Optional<TaskRepresenter> task = Optional.empty(); 114 private boolean isResourceReleased = false; 115 private boolean allocationFired = false; 116 117 @Inject 118 private EvaluatorManager( 119 final Clock clock, 120 final RemoteManager remoteManager, 121 final ResourceReleaseHandler resourceReleaseHandler, 122 final ResourceLaunchHandler resourceLaunchHandler, 123 @Parameter(EvaluatorIdentifier.class) final String evaluatorId, 124 @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor, 125 final ContextRepresenters contextRepresenters, 126 final ConfigurationSerializer configurationSerializer, 127 final EvaluatorMessageDispatcher messageDispatcher, 128 final EvaluatorControlHandler evaluatorControlHandler, 129 final ContextControlHandler contextControlHandler, 130 final EvaluatorStatusManager stateManager, 131 final ExceptionCodec exceptionCodec, 132 final EventHandlerIdlenessSource idlenessSource, 133 final LoggingScopeFactory loggingScopeFactory, 134 @Parameter(EvaluatorConfigurationProviders.class) 135 final Set<ConfigurationProvider> evaluatorConfigurationProviders, 136 final DriverRestartManager driverRestartManager, 137 final EvaluatorIdlenessThreadPool idlenessThreadPool) { 138 this.contextRepresenters = contextRepresenters; 139 this.idlenessSource = idlenessSource; 140 LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId); 141 this.clock = clock; 142 this.resourceReleaseHandler = resourceReleaseHandler; 143 this.resourceLaunchHandler = resourceLaunchHandler; 144 this.evaluatorId = evaluatorId; 145 this.evaluatorDescriptor = evaluatorDescriptor; 146 147 this.messageDispatcher = messageDispatcher; 148 this.evaluatorControlHandler = evaluatorControlHandler; 149 this.contextControlHandler = contextControlHandler; 150 this.stateManager = stateManager; 151 this.exceptionCodec = exceptionCodec; 152 153 this.remoteManager = remoteManager; 154 this.configurationSerializer = configurationSerializer; 155 this.loggingScopeFactory = loggingScopeFactory; 156 this.evaluatorConfigurationProviders = evaluatorConfigurationProviders; 157 this.driverRestartManager = driverRestartManager; 158 this.idlenessThreadPool = idlenessThreadPool; 159 160 LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId()); 161 } 162 163 /** 164 * Get the id of current job/application. 165 */ 166 public static String getJobIdentifier() { 167 // TODO[JIRA REEF-818]: currently we obtain the job id directly by parsing execution (container) directory path 168 // #845 is open to get the id from RM properly 169 for (File directory = new File(System.getProperty("user.dir")); 170 directory != null; directory = directory.getParentFile()) { 171 final String currentDirectoryName = directory.getName(); 172 if (currentDirectoryName.toLowerCase().contains("application_")) { 173 return currentDirectoryName; 174 } 175 } 176 // cannot find a directory that contains application_, presumably we are on local runtime 177 // again, this is a hack for now, we need #845 as a proper solution 178 return "REEF_LOCAL_RUNTIME"; 179 } 180 181 /** 182 * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once. 183 */ 184 public synchronized void fireEvaluatorAllocatedEvent() { 185 if (!allocationFired && stateManager.isAllocated()) { 186 final AllocatedEvaluator allocatedEvaluator = 187 new AllocatedEvaluatorImpl(this, 188 remoteManager.getMyIdentifier(), 189 configurationSerializer, 190 getJobIdentifier(), 191 loggingScopeFactory, 192 evaluatorConfigurationProviders); 193 LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId); 194 messageDispatcher.onEvaluatorAllocated(allocatedEvaluator); 195 allocationFired = true; 196 } else { 197 LOG.log(Level.WARNING, "Evaluator allocated event fired more than once."); 198 } 199 } 200 201 private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent resourceStatusEvent) { 202 return resourceStatusEvent.getState() == State.DONE || 203 resourceStatusEvent.getState() == State.FAILED || 204 resourceStatusEvent.getState() == State.KILLED; 205 } 206 207 @Override 208 public String getId() { 209 return this.evaluatorId; 210 } 211 212 public void setProcess(final EvaluatorProcess process) { 213 this.evaluatorDescriptor.setProcess(process); 214 } 215 216 public EvaluatorDescriptor getEvaluatorDescriptor() { 217 return this.evaluatorDescriptor; 218 } 219 220 @Override 221 public void close() { 222 synchronized (this.evaluatorDescriptor) { 223 if (this.stateManager.isAllocatedOrSubmittedOrRunning()) { 224 LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId()); 225 try { 226 if (this.stateManager.isRunning()){ 227 // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. 228 final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto = 229 EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() 230 .setTimestamp(System.currentTimeMillis()) 231 .setIdentifier(getId()) 232 .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build()) 233 .build(); 234 sendEvaluatorControlMessage(evaluatorControlProto); 235 } 236 } finally { 237 this.stateManager.setKilled(); 238 } 239 } 240 241 242 if (!this.isResourceReleased) { 243 this.isResourceReleased = true; 244 try { 245 /* We need to wait awhile before returning the container to the RM in order to 246 * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */ 247 this.clock.scheduleAlarm(100, new EventHandler<Alarm>() { 248 @Override 249 public void onNext(final Alarm alarm) { 250 EvaluatorManager.this.resourceReleaseHandler.onNext( 251 ResourceReleaseEventImpl.newBuilder() 252 .setIdentifier(EvaluatorManager.this.evaluatorId) 253 .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName()) 254 .build() 255 ); 256 } 257 }); 258 } catch (final IllegalStateException e) { 259 LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e); 260 EvaluatorManager.this.resourceReleaseHandler.onNext( 261 ResourceReleaseEventImpl.newBuilder() 262 .setIdentifier(EvaluatorManager.this.evaluatorId) 263 .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName()) 264 .build() 265 ); 266 } 267 } 268 } 269 270 idlenessThreadPool.runCheckAsync(this); 271 } 272 273 /** 274 * Return true if the state is DONE, FAILED, or KILLED, 275 * <em>and</em> there are no messages queued or in processing. 276 */ 277 public boolean isClosed() { 278 return this.messageDispatcher.isEmpty() && 279 this.stateManager.isDoneOrFailedOrKilled(); 280 } 281 282 /** 283 * Triggers a call to check the idleness of the Evaluator. 284 */ 285 void checkIdlenessSource() { 286 this.idlenessSource.check(); 287 } 288 289 /** 290 * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED. 291 * 292 * @param exception on the EvaluatorRuntime 293 */ 294 public void onEvaluatorException(final EvaluatorException exception) { 295 synchronized (this.evaluatorDescriptor) { 296 if (this.stateManager.isDoneOrFailedOrKilled()) { 297 LOG.log(Level.FINE, "Ignoring an exception received for Evaluator {0} which is already in state {1}.", 298 new Object[]{this.getId(), this.stateManager}); 299 return; 300 } 301 302 LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception); 303 304 try { 305 306 final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure(); 307 308 final Optional<FailedTask> failedTaskOptional; 309 if (this.task.isPresent()) { 310 final String taskId = this.task.get().getId(); 311 final Optional<ActiveContext> evaluatorContext = Optional.empty(); 312 final Optional<byte[]> bytes = Optional.empty(); 313 final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash")); 314 final String message = "Evaluator crash"; 315 final Optional<String> description = Optional.empty(); 316 final FailedTask failedTask = 317 new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext); 318 failedTaskOptional = Optional.of(failedTask); 319 } else { 320 failedTaskOptional = Optional.empty(); 321 } 322 323 final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(exception, failedContextList, 324 failedTaskOptional, this.evaluatorId); 325 326 if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) { 327 this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator); 328 } else { 329 this.messageDispatcher.onEvaluatorFailed(failedEvaluator); 330 } 331 } catch (final Exception e) { 332 LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e); 333 } finally { 334 this.stateManager.setFailed(); 335 close(); 336 } 337 } 338 } 339 340 /** 341 * Process an evaluator heartbeat message. 342 */ 343 public void onEvaluatorHeartbeatMessage( 344 final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) { 345 346 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto = 347 evaluatorHeartbeatProtoRemoteMessage.getMessage(); 348 LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto); 349 350 synchronized (this.evaluatorDescriptor) { 351 if (this.stateManager.isDoneOrFailedOrKilled()) { 352 LOG.log(Level.FINE, "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.", 353 new Object[]{this.getId(), this.stateManager}); 354 return; 355 } 356 357 this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp()); 358 final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); 359 360 final EvaluatorRestartState evaluatorRestartState = driverRestartManager.getEvaluatorRestartState(evaluatorId); 361 362 /* 363 * First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator 364 * from a separate application attempt. In the case of a previous evaluator, if the restart period has not 365 * yet expired, we should register it and trigger context active and task events. If the restart period has 366 * expired, we should return immediately after setting its remote ID in order to close it. 367 */ 368 if (this.stateManager.isSubmitted() || 369 evaluatorRestartState == EvaluatorRestartState.REPORTED || 370 evaluatorRestartState == EvaluatorRestartState.EXPIRED) { 371 372 this.evaluatorControlHandler.setRemoteID(evaluatorRID); 373 374 if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) { 375 // Don't do anything if evaluator has expired. Close it immediately upon exit of this method. 376 return; 377 } 378 379 this.stateManager.setRunning(); 380 LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId); 381 382 if (evaluatorRestartState == EvaluatorRestartState.REPORTED) { 383 driverRestartManager.setEvaluatorReregistered(evaluatorId); 384 } 385 } 386 387 // All messages from a heartbeat receive the heartbeat timestamp as a sequence number. See REEF-806. 388 final long messageSequenceNumber = evaluatorHeartbeatProto.getTimestamp(); 389 390 // Process the Evaluator status message 391 if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { 392 EvaluatorStatusPOJO evaluatorStatus = new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus()); 393 this.onEvaluatorStatusMessage(evaluatorStatus); 394 } 395 396 // Process the Context status message(s) 397 final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus(); 398 final List<ContextStatusPOJO> contextStatusList = new ArrayList<>(); 399 for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) { 400 contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber)); 401 } 402 403 this.contextRepresenters.onContextStatusMessages(contextStatusList, 404 informClientOfNewContexts); 405 406 // Process the Task status message 407 if (evaluatorHeartbeatProto.hasTaskStatus()) { 408 TaskStatusPOJO taskStatus = new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber); 409 this.onTaskStatusMessage(taskStatus); 410 } 411 LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); 412 } 413 } 414 415 /** 416 * Process a evaluator status message. 417 * 418 * @param message 419 */ 420 private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO message) { 421 422 switch (message.getState()) { 423 case DONE: 424 this.onEvaluatorDone(message); 425 break; 426 case FAILED: 427 this.onEvaluatorFailed(message); 428 break; 429 case INIT: 430 case KILLED: 431 case RUNNING: 432 case SUSPEND: 433 break; 434 default: 435 throw new RuntimeException("Unknown state: " + message.getState()); 436 } 437 } 438 439 /** 440 * Process an evaluator message that indicates that the evaluator shut down cleanly. 441 * 442 * @param message 443 */ 444 private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) { 445 assert message.getState() == State.DONE; 446 LOG.log(Level.FINEST, "Evaluator {0} done.", getId()); 447 448 // Send an ACK to the Evaluator. 449 sendEvaluatorControlMessage( 450 EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() 451 .setTimestamp(System.currentTimeMillis()) 452 .setIdentifier(getId()) 453 .setDoneEvaluator(EvaluatorRuntimeProtocol.DoneEvaluatorProto.newBuilder().build()) 454 .build()); 455 456 this.stateManager.setDone(); 457 this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId)); 458 close(); 459 } 460 461 /** 462 * Process an evaluator message that indicates a crash. 463 * 464 * @param evaluatorStatus 465 */ 466 private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) { 467 assert evaluatorStatus.getState() 468 == State.FAILED; 469 final EvaluatorException evaluatorException; 470 if (evaluatorStatus.hasError()) { 471 final Optional<Throwable> exception = 472 this.exceptionCodec.fromBytes(evaluatorStatus.getError()); 473 if (exception.isPresent()) { 474 evaluatorException = new EvaluatorException(getId(), exception.get()); 475 } else { 476 evaluatorException = new EvaluatorException(getId(), 477 new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError())); 478 } 479 } else { 480 evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent")); 481 } 482 onEvaluatorException(evaluatorException); 483 } 484 485 public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) { 486 synchronized (this.evaluatorDescriptor) { 487 if (this.stateManager.isAllocated()) { 488 this.stateManager.setSubmitted(); 489 this.resourceLaunchHandler.onNext(resourceLaunchEvent); 490 } else { 491 throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED + 492 " state but instead is in state " + this.stateManager); 493 } 494 } 495 } 496 497 /** 498 * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime. 499 * 500 * @param contextControlProto message contains context control info. 501 */ 502 public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) { 503 synchronized (this.evaluatorDescriptor) { 504 LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId); 505 this.contextControlHandler.send(contextControlProto); 506 } 507 } 508 509 /** 510 * Forward the EvaluatorControlProto to the EvaluatorRuntime. 511 * 512 * @param evaluatorControlProto message contains evaluator control information. 513 */ 514 void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) { 515 synchronized (this.evaluatorDescriptor) { 516 this.evaluatorControlHandler.send(evaluatorControlProto); 517 } 518 } 519 520 /** 521 * Handle task status messages. 522 * 523 * @param taskStatus message contains the current task status. 524 */ 525 private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) { 526 527 if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) { 528 if (taskStatus.getState() == State.INIT || 529 taskStatus.getState() == State.FAILED || 530 taskStatus.getState() == State.RUNNING || 531 driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.REREGISTERED) { 532 533 // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order 534 // [REEF-289] is a related item which may fix the issue 535 if (taskStatus.getState() == State.RUNNING) { 536 LOG.log(Level.WARNING, 537 "Received a message of state " + ReefServiceProtos.State.RUNNING + 538 " for Task " + taskStatus.getTaskId() + 539 " before receiving its " + ReefServiceProtos.State.INIT + " state"); 540 } 541 542 // FAILED is a legal first state of a Task as it could have failed during construction. 543 this.task = Optional.of( 544 new TaskRepresenter(taskStatus.getTaskId(), 545 this.contextRepresenters.getContext(taskStatus.getContextId()), 546 this.messageDispatcher, 547 this, 548 this.exceptionCodec, 549 this.driverRestartManager)); 550 } else { 551 throw new RuntimeException("Received a message of state " + taskStatus.getState() + 552 ", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() + 553 " which we haven't heard from before."); 554 } 555 } 556 this.task.get().onTaskStatusMessage(taskStatus); 557 558 if (this.task.get().isNotRunning()) { 559 LOG.log(Level.FINEST, "Task no longer running. De-registering it."); 560 this.task = Optional.empty(); 561 } 562 } 563 564 /** 565 * Resource status information from the (actual) resource manager. 566 */ 567 public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) { 568 synchronized (this.evaluatorDescriptor) { 569 LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusEvent.getState()); 570 if (this.stateManager.isDoneOrFailedOrKilled()) { 571 LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.", 572 new Object[]{this.getId(), this.stateManager}); 573 } else if (isDoneOrFailedOrKilled(resourceStatusEvent) && this.stateManager.isAllocatedOrSubmittedOrRunning()) { 574 // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes 575 // it to be alive. 576 final StringBuilder messageBuilder = new StringBuilder("Evaluator [") 577 .append(this.evaluatorId) 578 .append("] is assumed to be in state [") 579 .append(this.stateManager.toString()) 580 .append("]. But the resource manager reports it to be in state [") 581 .append(resourceStatusEvent.getState()) 582 .append("]."); 583 584 if (this.stateManager.isSubmitted()) { 585 messageBuilder 586 .append(" This most likely means that the Evaluator suffered a failure before establishing " + 587 "a communications link to the driver."); 588 } else if (this.stateManager.isAllocated()) { 589 messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used."); 590 } else if (this.stateManager.isRunning()) { 591 messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " + 592 "back to the driver."); 593 } 594 if (this.task.isPresent()) { 595 messageBuilder.append(" Task [") 596 .append(this.task.get().getId()) 597 .append("] was running when the Evaluator crashed."); 598 } 599 600 if (resourceStatusEvent.getState() == State.KILLED) { 601 this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, 602 messageBuilder.toString())); 603 } else { 604 this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString())); 605 } 606 } 607 } 608 } 609 610 @Override 611 public String toString() { 612 return "EvaluatorManager:" 613 + " id=" + this.evaluatorId 614 + " state=" + this.stateManager 615 + " task=" + this.task; 616 } 617 618 // Dynamic Parameters 619 620 /** 621 * The Evaluator Identifier. 622 */ 623 @NamedParameter(doc = "The Evaluator Identifier.") 624 public static final class EvaluatorIdentifier implements Name<String> { 625 } 626 627 /** 628 * The Evaluator Host. 629 */ 630 @NamedParameter(doc = "The Evaluator Host.") 631 public static final class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> { 632 } 633}