001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.evaluator.FailedEvaluator; 024import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders; 025import org.apache.reef.driver.restart.DriverRestartManager; 026import org.apache.reef.driver.restart.EvaluatorRestartState; 027import org.apache.reef.exception.NonSerializableException; 028import org.apache.reef.runtime.common.driver.api.*; 029import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; 030import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO; 031import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 032import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO; 033import org.apache.reef.tang.ConfigurationProvider; 034import org.apache.reef.driver.context.ActiveContext; 035import org.apache.reef.driver.context.FailedContext; 036import org.apache.reef.driver.evaluator.AllocatedEvaluator; 037import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 038import org.apache.reef.driver.task.FailedTask; 039import org.apache.reef.exception.EvaluatorException; 040import org.apache.reef.exception.EvaluatorKilledByResourceManagerException; 041import org.apache.reef.io.naming.Identifiable; 042import org.apache.reef.proto.EvaluatorRuntimeProtocol; 043import org.apache.reef.proto.ReefServiceProtos; 044import org.apache.reef.driver.evaluator.EvaluatorProcess; 045import org.apache.reef.runtime.common.driver.context.ContextControlHandler; 046import org.apache.reef.runtime.common.driver.context.ContextRepresenters; 047import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource; 048import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; 049import org.apache.reef.runtime.common.driver.task.TaskRepresenter; 050import org.apache.reef.runtime.common.utils.ExceptionCodec; 051import org.apache.reef.runtime.common.utils.RemoteManager; 052import org.apache.reef.tang.annotations.Name; 053import org.apache.reef.tang.annotations.NamedParameter; 054import org.apache.reef.tang.annotations.Parameter; 055import org.apache.reef.tang.formats.ConfigurationSerializer; 056import org.apache.reef.util.Optional; 057import org.apache.reef.util.logging.LoggingScopeFactory; 058import org.apache.reef.wake.EventHandler; 059import org.apache.reef.wake.remote.RemoteMessage; 060import org.apache.reef.wake.time.Clock; 061import org.apache.reef.wake.time.event.Alarm; 062 063import javax.inject.Inject; 064import java.io.File; 065import java.util.ArrayList; 066import java.util.List; 067import java.util.Set; 068import java.util.logging.Level; 069import java.util.logging.Logger; 070 071/** 072 * Manages a single Evaluator instance including all lifecycle instances: 073 * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). 074 * <p> 075 * A (periodic) heartbeat channel is established from EvaluatorRuntime to EvaluatorManager. 076 * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this 077 * heartbeat channel. 078 * <p> 079 * A (push-based) EventHandler channel is established from EvaluatorManager to EvaluatorRuntime. 080 * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate 081 * control information (e.g., shutdown, suspend). 082 */ 083@Private 084@DriverSide 085public final class EvaluatorManager implements Identifiable, AutoCloseable { 086 087 private static final Logger LOG = Logger.getLogger(EvaluatorManager.class.getName()); 088 089 private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker(); 090 private final Clock clock; 091 private final ResourceReleaseHandler resourceReleaseHandler; 092 private final ResourceLaunchHandler resourceLaunchHandler; 093 private final String evaluatorId; 094 private final EvaluatorDescriptorImpl evaluatorDescriptor; 095 private final ContextRepresenters contextRepresenters; 096 private final EvaluatorMessageDispatcher messageDispatcher; 097 private final EvaluatorControlHandler evaluatorControlHandler; 098 private final ContextControlHandler contextControlHandler; 099 private final EvaluatorStatusManager stateManager; 100 private final ExceptionCodec exceptionCodec; 101 private final EventHandlerIdlenessSource idlenessSource; 102 private final RemoteManager remoteManager; 103 private final ConfigurationSerializer configurationSerializer; 104 private final LoggingScopeFactory loggingScopeFactory; 105 private final Set<ConfigurationProvider> evaluatorConfigurationProviders; 106 private final DriverRestartManager driverRestartManager; 107 private final EvaluatorIdlenessThreadPool idlenessThreadPool; 108 109 // Mutable fields 110 private Optional<TaskRepresenter> task = Optional.empty(); 111 private boolean resourceNotReleased = true; 112 private boolean allocationNotFired = true; 113 114 @Inject 115 private EvaluatorManager( 116 @Parameter(EvaluatorIdentifier.class) final String evaluatorId, 117 @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor, 118 @Parameter(EvaluatorConfigurationProviders.class) 119 final Set<ConfigurationProvider> evaluatorConfigurationProviders, 120 final Clock clock, 121 final RemoteManager remoteManager, 122 final ResourceReleaseHandler resourceReleaseHandler, 123 final ResourceLaunchHandler resourceLaunchHandler, 124 final ContextRepresenters contextRepresenters, 125 final ConfigurationSerializer configurationSerializer, 126 final EvaluatorMessageDispatcher messageDispatcher, 127 final EvaluatorControlHandler evaluatorControlHandler, 128 final ContextControlHandler contextControlHandler, 129 final EvaluatorStatusManager stateManager, 130 final ExceptionCodec exceptionCodec, 131 final EventHandlerIdlenessSource idlenessSource, 132 final LoggingScopeFactory loggingScopeFactory, 133 final DriverRestartManager driverRestartManager, 134 final EvaluatorIdlenessThreadPool idlenessThreadPool) { 135 136 LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId); 137 138 this.evaluatorId = evaluatorId; 139 this.evaluatorDescriptor = evaluatorDescriptor; 140 this.evaluatorConfigurationProviders = evaluatorConfigurationProviders; 141 142 this.clock = clock; 143 this.contextRepresenters = contextRepresenters; 144 this.idlenessSource = idlenessSource; 145 this.resourceReleaseHandler = resourceReleaseHandler; 146 this.resourceLaunchHandler = resourceLaunchHandler; 147 148 this.messageDispatcher = messageDispatcher; 149 this.evaluatorControlHandler = evaluatorControlHandler; 150 this.contextControlHandler = contextControlHandler; 151 this.stateManager = stateManager; 152 this.exceptionCodec = exceptionCodec; 153 154 this.remoteManager = remoteManager; 155 this.configurationSerializer = configurationSerializer; 156 this.loggingScopeFactory = loggingScopeFactory; 157 this.driverRestartManager = driverRestartManager; 158 this.idlenessThreadPool = idlenessThreadPool; 159 160 LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId()); 161 } 162 163 /** 164 * Get the id of current job/application. 165 */ 166 public static String getJobIdentifier() { 167 // TODO[JIRA REEF-818]: currently we obtain the job id directly by parsing execution (container) directory path 168 // #845 is open to get the id from RM properly 169 for (File directory = new File(System.getProperty("user.dir")); 170 directory != null; directory = directory.getParentFile()) { 171 final String currentDirectoryName = directory.getName(); 172 if (currentDirectoryName.toLowerCase().contains("application_")) { 173 return currentDirectoryName; 174 } 175 } 176 // cannot find a directory that contains application_, presumably we are on local runtime 177 // again, this is a hack for now, we need #845 as a proper solution 178 return "REEF_LOCAL_RUNTIME"; 179 } 180 181 /** 182 * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once. 183 */ 184 public synchronized void fireEvaluatorAllocatedEvent() { 185 186 if (this.stateManager.isAllocated() && this.allocationNotFired) { 187 188 final AllocatedEvaluator allocatedEvaluator = 189 new AllocatedEvaluatorImpl(this, 190 this.remoteManager.getMyIdentifier(), 191 this.configurationSerializer, 192 getJobIdentifier(), 193 this.loggingScopeFactory, 194 this.evaluatorConfigurationProviders); 195 196 LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", this.evaluatorId); 197 198 this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator); 199 this.allocationNotFired = false; 200 201 } else { 202 LOG.log(Level.WARNING, "AllocatedEvaluator event fired more than once."); 203 } 204 } 205 206 @Override 207 public String getId() { 208 return this.evaluatorId; 209 } 210 211 public void setProcess(final EvaluatorProcess process) { 212 this.evaluatorDescriptor.setProcess(process); 213 } 214 215 public EvaluatorDescriptor getEvaluatorDescriptor() { 216 return this.evaluatorDescriptor; 217 } 218 219 @Override 220 public void close() { 221 222 LOG.log(Level.FINER, "Close EvaluatorManager {0} - begin", this.evaluatorId); 223 224 synchronized (this.evaluatorDescriptor) { 225 226 if (this.stateManager.isAvailable()) { 227 228 LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId()); 229 230 try { 231 232 if (this.stateManager.isRunning()) { 233 234 // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. 235 this.sendEvaluatorControlMessage( 236 EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() 237 .setTimestamp(System.currentTimeMillis()) 238 .setIdentifier(getId()) 239 .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build()) 240 .build()); 241 242 this.stateManager.setClosing(); 243 244 } else { 245 this.stateManager.setKilled(); 246 } 247 248 } catch (Exception e) { 249 LOG.log(Level.WARNING, "Exception occurred when manager sends killing message to task.", e); 250 this.stateManager.setKilled(); 251 } 252 } 253 254 if (this.resourceNotReleased) { 255 256 this.resourceNotReleased = false; 257 258 final ResourceReleaseEvent releaseEvent = ResourceReleaseEventImpl.newBuilder() 259 .setIdentifier(this.evaluatorId) 260 .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName()) 261 .build(); 262 263 try { 264 // We need to wait awhile before returning the container to the RM 265 // in order to give the EvaluatorRuntime (and Launcher) time to cleanly exit. 266 this.clock.scheduleAlarm(200, new EventHandler<Alarm>() { 267 @Override 268 public void onNext(final Alarm alarm) { 269 LOG.log(Level.FINER, "Close EvaluatorManager {0} - release to RM", evaluatorId); 270 resourceReleaseHandler.onNext(releaseEvent); 271 shutdown(); 272 } 273 }); 274 } catch (final IllegalStateException e) { 275 LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e); 276 this.resourceReleaseHandler.onNext(releaseEvent); 277 this.shutdown(); 278 } 279 } 280 } 281 282 this.idlenessThreadPool.runCheckAsync(this); 283 284 LOG.log(Level.FINER, "Close EvaluatorManager {0} - end", this.evaluatorId); 285 } 286 287 /** 288 * Close message dispatcher for the evaluator. 289 */ 290 public void shutdown() { 291 LOG.log(Level.FINEST, "Shutdown EvaluatorManager: {0}", this.evaluatorId); 292 this.messageDispatcher.close(); 293 } 294 295 /** 296 * Return true if the state is DONE, FAILED, or KILLED, 297 * <em>and</em> there are no messages queued or in processing. 298 */ 299 public boolean isClosed() { 300 return this.messageDispatcher.isEmpty() && this.stateManager.isCompleted(); 301 } 302 303 /** 304 * Return true if the state is CLOSING. 305 */ 306 public boolean isClosing() { 307 return this.stateManager.isClosing(); 308 } 309 310 /** 311 * Return true if the state is DONE, FAILED, KILLED, or CLOSING. 312 */ 313 public boolean isClosedOrClosing() { 314 return isClosed() || isClosing(); 315 } 316 317 /** 318 * Triggers a call to check the idleness of the Evaluator. 319 */ 320 void checkIdlenessSource() { 321 this.idlenessSource.check(); 322 } 323 324 /** 325 * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED. 326 * 327 * @param exception on the EvaluatorRuntime 328 */ 329 public void onEvaluatorException(final EvaluatorException exception) { 330 synchronized (this.evaluatorDescriptor) { 331 332 if (this.stateManager.isCompleted()) { 333 LOG.log(Level.FINE, 334 "Ignoring an exception received for Evaluator {0} which is already in state {1}.", 335 new Object[] {this.getId(), this.stateManager}); 336 return; 337 } 338 339 LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception); 340 341 try { 342 343 final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure(); 344 345 final Optional<FailedTask> failedTaskOptional; 346 if (this.task.isPresent()) { 347 348 final String taskId = this.task.get().getId(); 349 final Optional<ActiveContext> evaluatorContext = Optional.empty(); 350 final Optional<byte[]> bytes = Optional.empty(); 351 final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash")); 352 final String message = "Evaluator crash"; 353 final Optional<String> description = Optional.empty(); 354 final FailedTask failedTask = 355 new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext); 356 357 failedTaskOptional = Optional.of(failedTask); 358 359 } else { 360 failedTaskOptional = Optional.empty(); 361 } 362 363 final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl( 364 exception, failedContextList, failedTaskOptional, this.evaluatorId); 365 366 if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) { 367 this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator); 368 } else { 369 this.messageDispatcher.onEvaluatorFailed(failedEvaluator); 370 } 371 } catch (final Exception e) { 372 LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e); 373 } finally { 374 this.stateManager.setFailed(); 375 this.close(); 376 } 377 } 378 } 379 380 /** 381 * Process an evaluator heartbeat message. 382 */ 383 public void onEvaluatorHeartbeatMessage( 384 final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) { 385 386 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto = 387 evaluatorHeartbeatProtoRemoteMessage.getMessage(); 388 389 LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto); 390 391 synchronized (this.evaluatorDescriptor) { 392 393 if (this.stateManager.isCompleted()) { 394 395 LOG.log(Level.FINE, 396 "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.", 397 new Object[] {this.getId(), this.stateManager}); 398 399 return; 400 401 } else if (this.stateManager.isAvailable()) { 402 403 this.sanityChecker.check(this.evaluatorId, evaluatorHeartbeatProto.getTimestamp()); 404 final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); 405 406 final EvaluatorRestartState evaluatorRestartState = 407 this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId); 408 409 /* 410 * First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator 411 * from a separate application attempt. In the case of a previous evaluator, if the restart period has not 412 * yet expired, we should register it and trigger context active and task events. If the restart period has 413 * expired, we should return immediately after setting its remote ID in order to close it. 414 */ 415 if (this.stateManager.isSubmitted() || 416 evaluatorRestartState == EvaluatorRestartState.REPORTED || 417 evaluatorRestartState == EvaluatorRestartState.EXPIRED) { 418 419 this.evaluatorControlHandler.setRemoteID(evaluatorRID); 420 421 if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) { 422 // Don't do anything if evaluator has expired. Close it immediately upon exit of this method. 423 return; 424 } 425 426 this.stateManager.setRunning(); 427 LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId); 428 429 if (evaluatorRestartState == EvaluatorRestartState.REPORTED) { 430 this.driverRestartManager.setEvaluatorReregistered(this.evaluatorId); 431 } 432 } 433 } 434 435 // All messages from a heartbeat receive the heartbeat timestamp as a sequence number. See REEF-806. 436 final long messageSequenceNumber = evaluatorHeartbeatProto.getTimestamp(); 437 438 // Process the Evaluator status message 439 if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { 440 this.onEvaluatorStatusMessage(new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus())); 441 } 442 443 // Process the Context status message(s) 444 final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus(); 445 final List<ContextStatusPOJO> contextStatusList = new ArrayList<>(); 446 for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) { 447 contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber)); 448 } 449 450 this.contextRepresenters.onContextStatusMessages(contextStatusList, informClientOfNewContexts); 451 452 // Process the Task status message 453 if (evaluatorHeartbeatProto.hasTaskStatus()) { 454 this.onTaskStatusMessage(new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber)); 455 } 456 457 LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); 458 } 459 } 460 461 /** 462 * Process a evaluator status message. 463 * 464 * @param message 465 */ 466 private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO message) { 467 468 switch (message.getState()) { 469 case DONE: 470 this.onEvaluatorDone(message); 471 break; 472 case FAILED: 473 this.onEvaluatorFailed(message); 474 break; 475 case KILLED: 476 this.onEvaluatorKilled(message); 477 break; 478 case INIT: 479 case RUNNING: 480 case SUSPEND: 481 break; 482 default: 483 throw new RuntimeException("Unknown state: " + message.getState()); 484 } 485 } 486 487 /** 488 * Process an evaluator message that indicates that the evaluator shut down cleanly. 489 * 490 * @param message 491 */ 492 private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) { 493 494 assert message.getState() == State.DONE; 495 496 LOG.log(Level.FINEST, "Evaluator {0} done.", getId()); 497 498 // Send an ACK to the Evaluator. 499 sendEvaluatorControlMessage( 500 EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() 501 .setTimestamp(System.currentTimeMillis()) 502 .setIdentifier(getId()) 503 .setDoneEvaluator(EvaluatorRuntimeProtocol.DoneEvaluatorProto.newBuilder().build()) 504 .build()); 505 506 this.stateManager.setDone(); 507 this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId)); 508 509 this.close(); 510 } 511 512 /** 513 * Process an evaluator message that indicates a crash. 514 * 515 * @param evaluatorStatus 516 */ 517 private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) { 518 519 assert evaluatorStatus.getState() == State.FAILED; 520 521 final EvaluatorException evaluatorException; 522 523 if (evaluatorStatus.hasError()) { 524 525 final Optional<Throwable> exception = 526 this.exceptionCodec.fromBytes(evaluatorStatus.getError()); 527 528 evaluatorException = new EvaluatorException(getId(), exception.isPresent() ? exception.get() : 529 new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError())); 530 531 } else { 532 evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent")); 533 } 534 535 this.onEvaluatorException(evaluatorException); 536 } 537 538 /** 539 * Process an evaluator message that indicates that the evaluator completed the unclean shut down request. 540 * 541 * @param message 542 */ 543 private synchronized void onEvaluatorKilled(final EvaluatorStatusPOJO message) { 544 545 assert message.getState() == State.KILLED; 546 assert this.stateManager.isClosing(); 547 548 LOG.log(Level.WARNING, "Evaluator {0} killed completely.", getId()); 549 550 this.stateManager.setKilled(); 551 } 552 553 public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) { 554 synchronized (this.evaluatorDescriptor) { 555 if (this.stateManager.isAllocated()) { 556 this.stateManager.setSubmitted(); 557 this.resourceLaunchHandler.onNext(resourceLaunchEvent); 558 } else if (this.stateManager.isCompletedAbnormally()) { 559 LOG.log(Level.WARNING, "Evaluator manager expected {0} state but instead is in state {1}", 560 new Object[] {EvaluatorState.ALLOCATED, this.stateManager}); 561 } else { 562 throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED + 563 " state but instead is in state " + this.stateManager); 564 } 565 } 566 } 567 568 /** 569 * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime. 570 * 571 * @param contextControlProto message contains context control info. 572 */ 573 public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) { 574 synchronized (this.evaluatorDescriptor) { 575 LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId); 576 this.contextControlHandler.send(contextControlProto); 577 } 578 } 579 580 /** 581 * Forward the EvaluatorControlProto to the EvaluatorRuntime. 582 * 583 * @param evaluatorControlProto message contains evaluator control information. 584 */ 585 void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) { 586 synchronized (this.evaluatorDescriptor) { 587 this.evaluatorControlHandler.send(evaluatorControlProto); 588 } 589 } 590 591 /** 592 * Handle task status messages. 593 * 594 * @param taskStatus message contains the current task status. 595 */ 596 private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) { 597 598 if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) { 599 600 final State state = taskStatus.getState(); 601 if (state.isRestartable() || 602 this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId).isReregistered()) { 603 604 // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order 605 // [REEF-289] is a related item which may fix the issue 606 if (state.isRunning()) { 607 LOG.log(Level.WARNING, 608 "Received a message of state {0} for Task {1} before receiving its {2} state", 609 new Object[] {State.RUNNING, taskStatus.getTaskId(), State.INIT}); 610 } 611 612 // FAILED is a legal first state of a Task as it could have failed during construction. 613 this.task = Optional.of( 614 new TaskRepresenter(taskStatus.getTaskId(), 615 this.contextRepresenters.getContext(taskStatus.getContextId()), 616 this.messageDispatcher, 617 this, 618 this.exceptionCodec, 619 this.driverRestartManager)); 620 } else { 621 throw new RuntimeException("Received a message of state " + state + 622 ", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() + 623 " which we haven't heard from before."); 624 } 625 } 626 627 this.task.get().onTaskStatusMessage(taskStatus); 628 629 if (this.task.get().isNotRunning()) { 630 LOG.log(Level.FINEST, "Task no longer running. De-registering it."); 631 this.task = Optional.empty(); 632 } 633 } 634 635 /** 636 * Resource status information from the (actual) resource manager. 637 */ 638 public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) { 639 640 synchronized (this.evaluatorDescriptor) { 641 642 final State state = resourceStatusEvent.getState(); 643 LOG.log(Level.FINEST, "Resource manager state update: {0}", state); 644 645 if (!this.stateManager.isAvailable()) { 646 647 LOG.log(Level.FINE, 648 "Ignoring resource status update for Evaluator {0} which is already in state {1}.", 649 new Object[] {this.getId(), this.stateManager}); 650 651 } else if (state.isCompleted() && this.stateManager.isAvailable()) { 652 653 // Something is wrong. The resource manager reports that the Evaluator is done or failed, 654 // but the Driver assumes it to be alive. 655 final StringBuilder messageBuilder = new StringBuilder("Evaluator [") 656 .append(this.evaluatorId) 657 .append("] is assumed to be in state [") 658 .append(this.stateManager.toString()) 659 .append("]. But the resource manager reports it to be in state [") 660 .append(state) 661 .append("]."); 662 663 if (this.stateManager.isSubmitted()) { 664 messageBuilder 665 .append(" This most likely means that the Evaluator suffered a failure before establishing " + 666 "a communications link to the driver."); 667 } else if (this.stateManager.isAllocated()) { 668 messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used."); 669 } else if (this.stateManager.isRunning()) { 670 messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " + 671 "back to the driver."); 672 } 673 674 if (this.task.isPresent()) { 675 messageBuilder.append(" Task [") 676 .append(this.task.get().getId()) 677 .append("] was running when the Evaluator crashed."); 678 } 679 680 if (resourceStatusEvent.getState() == State.KILLED) { 681 this.onEvaluatorException( 682 new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString())); 683 } else { 684 this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString())); 685 } 686 } 687 } 688 } 689 690 @Override 691 public String toString() { 692 return "EvaluatorManager:" 693 + " id=" + this.evaluatorId 694 + " state=" + this.stateManager 695 + " task=" + this.task; 696 } 697 698 // Dynamic Parameters 699 700 /** 701 * The Evaluator Identifier. 702 */ 703 @NamedParameter(doc = "The Evaluator Identifier.") 704 public static final class EvaluatorIdentifier implements Name<String> { 705 } 706 707 /** 708 * The Evaluator Host. 709 */ 710 @NamedParameter(doc = "The Evaluator Host.") 711 public static final class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> { 712 } 713}