001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.evaluator.FailedEvaluator; 024import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders; 025import org.apache.reef.driver.restart.DriverRestartManager; 026import org.apache.reef.driver.restart.EvaluatorRestartState; 027import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; 028import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO; 029import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 030import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO; 031import org.apache.reef.tang.ConfigurationProvider; 032import org.apache.reef.driver.context.ActiveContext; 033import org.apache.reef.driver.context.FailedContext; 034import org.apache.reef.driver.evaluator.AllocatedEvaluator; 035import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 036import org.apache.reef.driver.task.FailedTask; 037import org.apache.reef.exception.EvaluatorException; 038import org.apache.reef.exception.EvaluatorKilledByResourceManagerException; 039import org.apache.reef.io.naming.Identifiable; 040import org.apache.reef.proto.EvaluatorRuntimeProtocol; 041import org.apache.reef.proto.ReefServiceProtos; 042import org.apache.reef.driver.evaluator.EvaluatorProcess; 043import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; 044import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl; 045import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; 046import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; 047import org.apache.reef.runtime.common.driver.context.ContextControlHandler; 048import org.apache.reef.runtime.common.driver.context.ContextRepresenters; 049import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource; 050import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; 051import org.apache.reef.runtime.common.driver.task.TaskRepresenter; 052import org.apache.reef.runtime.common.utils.ExceptionCodec; 053import org.apache.reef.runtime.common.utils.RemoteManager; 054import org.apache.reef.tang.annotations.Name; 055import org.apache.reef.tang.annotations.NamedParameter; 056import org.apache.reef.tang.annotations.Parameter; 057import org.apache.reef.tang.formats.ConfigurationSerializer; 058import org.apache.reef.util.Optional; 059import org.apache.reef.util.logging.LoggingScopeFactory; 060import org.apache.reef.wake.EventHandler; 061import org.apache.reef.wake.remote.RemoteMessage; 062import org.apache.reef.wake.time.Clock; 063import org.apache.reef.wake.time.event.Alarm; 064 065import javax.inject.Inject; 066import java.io.File; 067import java.util.ArrayList; 068import java.util.List; 069import java.util.Set; 070import java.util.logging.Level; 071import java.util.logging.Logger; 072 073/** 074 * Manages a single Evaluator instance including all lifecycle instances: 075 * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). 076 * <p> 077 * A (periodic) heartbeat channel is established from EvaluatorRuntime to EvaluatorManager. 078 * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this 079 * heartbeat channel. 080 * <p> 081 * A (push-based) EventHandler channel is established from EvaluatorManager to EvaluatorRuntime. 082 * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate 083 * control information (e.g., shutdown, suspend). 084 */ 085@Private 086@DriverSide 087public final class EvaluatorManager implements Identifiable, AutoCloseable { 088 089 private static final Logger LOG = Logger.getLogger(EvaluatorManager.class.getName()); 090 091 private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker(); 092 private final Clock clock; 093 private final ResourceReleaseHandler resourceReleaseHandler; 094 private final ResourceLaunchHandler resourceLaunchHandler; 095 private final String evaluatorId; 096 private final EvaluatorDescriptorImpl evaluatorDescriptor; 097 private final ContextRepresenters contextRepresenters; 098 private final EvaluatorMessageDispatcher messageDispatcher; 099 private final EvaluatorControlHandler evaluatorControlHandler; 100 private final ContextControlHandler contextControlHandler; 101 private final EvaluatorStatusManager stateManager; 102 private final ExceptionCodec exceptionCodec; 103 private final EventHandlerIdlenessSource idlenessSource; 104 private final RemoteManager remoteManager; 105 private final ConfigurationSerializer configurationSerializer; 106 private final LoggingScopeFactory loggingScopeFactory; 107 private final Set<ConfigurationProvider> evaluatorConfigurationProviders; 108 private final DriverRestartManager driverRestartManager; 109 110 // Mutable fields 111 private Optional<TaskRepresenter> task = Optional.empty(); 112 private boolean isResourceReleased = false; 113 private boolean allocationFired = false; 114 115 @Inject 116 private EvaluatorManager( 117 final Clock clock, 118 final RemoteManager remoteManager, 119 final ResourceReleaseHandler resourceReleaseHandler, 120 final ResourceLaunchHandler resourceLaunchHandler, 121 @Parameter(EvaluatorIdentifier.class) final String evaluatorId, 122 @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor, 123 final ContextRepresenters contextRepresenters, 124 final ConfigurationSerializer configurationSerializer, 125 final EvaluatorMessageDispatcher messageDispatcher, 126 final EvaluatorControlHandler evaluatorControlHandler, 127 final ContextControlHandler contextControlHandler, 128 final EvaluatorStatusManager stateManager, 129 final ExceptionCodec exceptionCodec, 130 final EventHandlerIdlenessSource idlenessSource, 131 final LoggingScopeFactory loggingScopeFactory, 132 @Parameter(EvaluatorConfigurationProviders.class) 133 final Set<ConfigurationProvider> evaluatorConfigurationProviders, 134 final DriverRestartManager driverRestartManager) { 135 this.contextRepresenters = contextRepresenters; 136 this.idlenessSource = idlenessSource; 137 LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId); 138 this.clock = clock; 139 this.resourceReleaseHandler = resourceReleaseHandler; 140 this.resourceLaunchHandler = resourceLaunchHandler; 141 this.evaluatorId = evaluatorId; 142 this.evaluatorDescriptor = evaluatorDescriptor; 143 144 this.messageDispatcher = messageDispatcher; 145 this.evaluatorControlHandler = evaluatorControlHandler; 146 this.contextControlHandler = contextControlHandler; 147 this.stateManager = stateManager; 148 this.exceptionCodec = exceptionCodec; 149 150 this.remoteManager = remoteManager; 151 this.configurationSerializer = configurationSerializer; 152 this.loggingScopeFactory = loggingScopeFactory; 153 this.evaluatorConfigurationProviders = evaluatorConfigurationProviders; 154 this.driverRestartManager = driverRestartManager; 155 156 LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId()); 157 } 158 159 /** 160 * Get the id of current job/application. 161 */ 162 public static String getJobIdentifier() { 163 // TODO[JIRA REEF-818]: currently we obtain the job id directly by parsing execution (container) directory path 164 // #845 is open to get the id from RM properly 165 for (File directory = new File(System.getProperty("user.dir")); 166 directory != null; directory = directory.getParentFile()) { 167 final String currentDirectoryName = directory.getName(); 168 if (currentDirectoryName.toLowerCase().contains("application_")) { 169 return currentDirectoryName; 170 } 171 } 172 // cannot find a directory that contains application_, presumably we are on local runtime 173 // again, this is a hack for now, we need #845 as a proper solution 174 return "REEF_LOCAL_RUNTIME"; 175 } 176 177 /** 178 * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once. 179 */ 180 public synchronized void fireEvaluatorAllocatedEvent() { 181 if (!allocationFired && stateManager.isAllocated()) { 182 final AllocatedEvaluator allocatedEvaluator = 183 new AllocatedEvaluatorImpl(this, 184 remoteManager.getMyIdentifier(), 185 configurationSerializer, 186 getJobIdentifier(), 187 loggingScopeFactory, 188 evaluatorConfigurationProviders); 189 LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId); 190 messageDispatcher.onEvaluatorAllocated(allocatedEvaluator); 191 allocationFired = true; 192 } else { 193 LOG.log(Level.WARNING, "Evaluator allocated event fired more than once."); 194 } 195 } 196 197 private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent resourceStatusEvent) { 198 return resourceStatusEvent.getState() == State.DONE || 199 resourceStatusEvent.getState() == State.FAILED || 200 resourceStatusEvent.getState() == State.KILLED; 201 } 202 203 @Override 204 public String getId() { 205 return this.evaluatorId; 206 } 207 208 public void setProcess(final EvaluatorProcess process) { 209 this.evaluatorDescriptor.setProcess(process); 210 } 211 212 public EvaluatorDescriptor getEvaluatorDescriptor() { 213 return this.evaluatorDescriptor; 214 } 215 216 @Override 217 public void close() { 218 synchronized (this.evaluatorDescriptor) { 219 if (this.stateManager.isAllocatedOrSubmittedOrRunning()) { 220 LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId()); 221 try { 222 if (this.stateManager.isRunning()){ 223 // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. 224 final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto = 225 EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() 226 .setTimestamp(System.currentTimeMillis()) 227 .setIdentifier(getId()) 228 .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build()) 229 .build(); 230 sendEvaluatorControlMessage(evaluatorControlProto); 231 } 232 } finally { 233 this.stateManager.setKilled(); 234 } 235 } 236 237 238 if (!this.isResourceReleased) { 239 this.isResourceReleased = true; 240 try { 241 /* We need to wait awhile before returning the container to the RM in order to 242 * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */ 243 this.clock.scheduleAlarm(100, new EventHandler<Alarm>() { 244 @Override 245 public void onNext(final Alarm alarm) { 246 EvaluatorManager.this.resourceReleaseHandler.onNext( 247 ResourceReleaseEventImpl.newBuilder() 248 .setIdentifier(EvaluatorManager.this.evaluatorId) 249 .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName()) 250 .build() 251 ); 252 } 253 }); 254 } catch (final IllegalStateException e) { 255 LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e); 256 EvaluatorManager.this.resourceReleaseHandler.onNext( 257 ResourceReleaseEventImpl.newBuilder() 258 .setIdentifier(EvaluatorManager.this.evaluatorId) 259 .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName()) 260 .build() 261 ); 262 } 263 } 264 } 265 266 try { 267 this.messageDispatcher.close(); 268 } catch (Exception e) { 269 LOG.log(Level.SEVERE, "Exception while closing EvaluatorManager", e); 270 } 271 this.idlenessSource.check(); 272 } 273 274 /** 275 * Return true if the state is DONE, FAILED, or KILLED, 276 * <em>and</em> there are no messages queued or in processing. 277 */ 278 public boolean isClosed() { 279 return this.messageDispatcher.isEmpty() && 280 this.stateManager.isDoneOrFailedOrKilled(); 281 } 282 283 /** 284 * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED. 285 * 286 * @param exception on the EvaluatorRuntime 287 */ 288 public void onEvaluatorException(final EvaluatorException exception) { 289 synchronized (this.evaluatorDescriptor) { 290 if (this.stateManager.isDoneOrFailedOrKilled()) { 291 LOG.log(Level.FINE, "Ignoring an exception received for Evaluator {0} which is already in state {1}.", 292 new Object[]{this.getId(), this.stateManager}); 293 return; 294 } 295 296 LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception); 297 298 try { 299 300 final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure(); 301 302 final Optional<FailedTask> failedTaskOptional; 303 if (this.task.isPresent()) { 304 final String taskId = this.task.get().getId(); 305 final Optional<ActiveContext> evaluatorContext = Optional.empty(); 306 final Optional<byte[]> bytes = Optional.empty(); 307 final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash")); 308 final String message = "Evaluator crash"; 309 final Optional<String> description = Optional.empty(); 310 final FailedTask failedTask = 311 new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext); 312 failedTaskOptional = Optional.of(failedTask); 313 } else { 314 failedTaskOptional = Optional.empty(); 315 } 316 317 final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(exception, failedContextList, 318 failedTaskOptional, this.evaluatorId); 319 320 if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) { 321 this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator); 322 } else { 323 this.messageDispatcher.onEvaluatorFailed(failedEvaluator); 324 } 325 } catch (final Exception e) { 326 LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e); 327 } finally { 328 this.stateManager.setFailed(); 329 close(); 330 } 331 } 332 } 333 334 /** 335 * Process an evaluator heartbeat message. 336 */ 337 public void onEvaluatorHeartbeatMessage( 338 final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) { 339 340 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto = 341 evaluatorHeartbeatProtoRemoteMessage.getMessage(); 342 LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto); 343 344 synchronized (this.evaluatorDescriptor) { 345 if (this.stateManager.isDoneOrFailedOrKilled()) { 346 LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.", 347 new Object[]{this.getId(), this.stateManager}); 348 return; 349 } 350 351 this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp()); 352 final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); 353 354 final EvaluatorRestartState evaluatorRestartState = driverRestartManager.getEvaluatorRestartState(evaluatorId); 355 356 /* 357 * First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator 358 * from a separate application attempt. In the case of a previous evaluator, if the restart period has not 359 * yet expired, we should register it and trigger context active and task events. If the restart period has 360 * expired, we should return immediately after setting its remote ID in order to close it. 361 */ 362 if (this.stateManager.isSubmitted() || 363 evaluatorRestartState == EvaluatorRestartState.REPORTED || 364 evaluatorRestartState == EvaluatorRestartState.EXPIRED) { 365 366 this.evaluatorControlHandler.setRemoteID(evaluatorRID); 367 368 if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) { 369 // Don't do anything if evaluator has expired. Close it immediately upon exit of this method. 370 return; 371 } 372 373 this.stateManager.setRunning(); 374 LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId); 375 376 if (evaluatorRestartState == EvaluatorRestartState.REPORTED) { 377 driverRestartManager.setEvaluatorReregistered(evaluatorId); 378 } 379 } 380 381 // All messages from a heartbeat receive the heartbeat timestamp as a sequence number. See REEF-806. 382 final long messageSequenceNumber = evaluatorHeartbeatProto.getTimestamp(); 383 384 // Process the Evaluator status message 385 if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { 386 EvaluatorStatusPOJO evaluatorStatus = new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus()); 387 this.onEvaluatorStatusMessage(evaluatorStatus); 388 } 389 390 // Process the Context status message(s) 391 final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus(); 392 final List<ContextStatusPOJO> contextStatusList = new ArrayList<>(); 393 for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) { 394 contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber)); 395 } 396 397 this.contextRepresenters.onContextStatusMessages(contextStatusList, 398 informClientOfNewContexts); 399 400 // Process the Task status message 401 if (evaluatorHeartbeatProto.hasTaskStatus()) { 402 TaskStatusPOJO taskStatus = new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber); 403 this.onTaskStatusMessage(taskStatus); 404 } 405 LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); 406 } 407 } 408 409 /** 410 * Process a evaluator status message. 411 * 412 * @param message 413 */ 414 private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO message) { 415 416 switch (message.getState()) { 417 case DONE: 418 this.onEvaluatorDone(message); 419 break; 420 case FAILED: 421 this.onEvaluatorFailed(message); 422 break; 423 case INIT: 424 case KILLED: 425 case RUNNING: 426 case SUSPEND: 427 break; 428 default: 429 throw new RuntimeException("Unknown state: " + message.getState()); 430 } 431 } 432 433 /** 434 * Process an evaluator message that indicates that the evaluator shut down cleanly. 435 * 436 * @param message 437 */ 438 private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) { 439 assert message.getState() == State.DONE; 440 LOG.log(Level.FINEST, "Evaluator {0} done.", getId()); 441 this.stateManager.setDone(); 442 this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId)); 443 close(); 444 } 445 446 /** 447 * Process an evaluator message that indicates a crash. 448 * 449 * @param evaluatorStatus 450 */ 451 private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) { 452 assert evaluatorStatus.getState() 453 == State.FAILED; 454 final EvaluatorException evaluatorException; 455 if (evaluatorStatus.hasError()) { 456 final Optional<Throwable> exception = 457 this.exceptionCodec.fromBytes(evaluatorStatus.getError()); 458 if (exception.isPresent()) { 459 evaluatorException = new EvaluatorException(getId(), exception.get()); 460 } else { 461 evaluatorException = new EvaluatorException(getId(), 462 new Exception("Exception sent, but can't be deserialized")); 463 } 464 } else { 465 evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent")); 466 } 467 onEvaluatorException(evaluatorException); 468 } 469 470 public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) { 471 synchronized (this.evaluatorDescriptor) { 472 if (this.stateManager.isAllocated()) { 473 this.stateManager.setSubmitted(); 474 this.resourceLaunchHandler.onNext(resourceLaunchEvent); 475 } else { 476 throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED + 477 " state but instead is in state " + this.stateManager); 478 } 479 } 480 } 481 482 /** 483 * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime. 484 * 485 * @param contextControlProto message contains context control info. 486 */ 487 public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) { 488 synchronized (this.evaluatorDescriptor) { 489 LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId); 490 this.contextControlHandler.send(contextControlProto); 491 } 492 } 493 494 /** 495 * Forward the EvaluatorControlProto to the EvaluatorRuntime. 496 * 497 * @param evaluatorControlProto message contains evaluator control information. 498 */ 499 void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) { 500 synchronized (this.evaluatorDescriptor) { 501 this.evaluatorControlHandler.send(evaluatorControlProto); 502 } 503 } 504 505 /** 506 * Handle task status messages. 507 * 508 * @param taskStatus message contains the current task status. 509 */ 510 private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) { 511 512 if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) { 513 if (taskStatus.getState() == State.INIT || 514 taskStatus.getState() == State.FAILED || 515 taskStatus.getState() == State.RUNNING || 516 driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.REREGISTERED) { 517 518 // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order 519 // [REEF-289] is a related item which may fix the issue 520 if (taskStatus.getState() == State.RUNNING) { 521 LOG.log(Level.WARNING, 522 "Received a message of state " + ReefServiceProtos.State.RUNNING + 523 " for Task " + taskStatus.getTaskId() + 524 " before receiving its " + ReefServiceProtos.State.INIT + " state"); 525 } 526 527 // FAILED is a legal first state of a Task as it could have failed during construction. 528 this.task = Optional.of( 529 new TaskRepresenter(taskStatus.getTaskId(), 530 this.contextRepresenters.getContext(taskStatus.getContextId()), 531 this.messageDispatcher, 532 this, 533 this.exceptionCodec, 534 this.driverRestartManager)); 535 } else { 536 throw new RuntimeException("Received a message of state " + taskStatus.getState() + 537 ", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() + 538 " which we haven't heard from before."); 539 } 540 } 541 this.task.get().onTaskStatusMessage(taskStatus); 542 543 if (this.task.get().isNotRunning()) { 544 LOG.log(Level.FINEST, "Task no longer running. De-registering it."); 545 this.task = Optional.empty(); 546 } 547 } 548 549 /** 550 * Resource status information from the (actual) resource manager. 551 */ 552 public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) { 553 synchronized (this.evaluatorDescriptor) { 554 LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusEvent.getState()); 555 if (this.stateManager.isDoneOrFailedOrKilled()) { 556 LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.", 557 new Object[]{this.getId(), this.stateManager}); 558 } else if (isDoneOrFailedOrKilled(resourceStatusEvent) && this.stateManager.isAllocatedOrSubmittedOrRunning()) { 559 // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes 560 // it to be alive. 561 final StringBuilder messageBuilder = new StringBuilder("Evaluator [") 562 .append(this.evaluatorId) 563 .append("] is assumed to be in state [") 564 .append(this.stateManager.toString()) 565 .append("]. But the resource manager reports it to be in state [") 566 .append(resourceStatusEvent.getState()) 567 .append("]."); 568 569 if (this.stateManager.isSubmitted()) { 570 messageBuilder 571 .append(" This most likely means that the Evaluator suffered a failure before establishing " + 572 "a communications link to the driver."); 573 } else if (this.stateManager.isAllocated()) { 574 messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used."); 575 } else if (this.stateManager.isRunning()) { 576 messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " + 577 "back to the driver."); 578 } 579 if (this.task.isPresent()) { 580 messageBuilder.append(" Task [") 581 .append(this.task.get().getId()) 582 .append("] was running when the Evaluator crashed."); 583 } 584 585 if (resourceStatusEvent.getState() == State.KILLED) { 586 this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, 587 messageBuilder.toString())); 588 } else { 589 this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString())); 590 } 591 } 592 } 593 } 594 595 @Override 596 public String toString() { 597 return "EvaluatorManager:" 598 + " id=" + this.evaluatorId 599 + " state=" + this.stateManager 600 + " task=" + this.task; 601 } 602 603 // Dynamic Parameters 604 605 /** 606 * The Evaluator Identifier. 607 */ 608 @NamedParameter(doc = "The Evaluator Identifier.") 609 public static final class EvaluatorIdentifier implements Name<String> { 610 } 611 612 /** 613 * The Evaluator Host. 614 */ 615 @NamedParameter(doc = "The Evaluator Host.") 616 public static final class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> { 617 } 618}