001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.restart.DriverRestartManager;
024import org.apache.reef.driver.restart.EvaluatorRestartState;
025import org.apache.reef.proto.EvaluatorRuntimeProtocol;
026import org.apache.reef.proto.ReefServiceProtos;
027import org.apache.reef.util.Optional;
028import org.apache.reef.wake.EventHandler;
029import org.apache.reef.wake.remote.RemoteMessage;
030
031import javax.inject.Inject;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Receives heartbeats from all Evaluators and dispatches them to the right EvaluatorManager instance.
037 */
038@Private
039@DriverSide
040public final class EvaluatorHeartbeatHandler
041    implements EventHandler<RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto>> {
042  private static final Logger LOG = Logger.getLogger(EvaluatorHeartbeatHandler.class.getName());
043  private final Evaluators evaluators;
044  private final EvaluatorManagerFactory evaluatorManagerFactory;
045  private final DriverRestartManager driverRestartManager;
046
047  @Inject
048  EvaluatorHeartbeatHandler(final Evaluators evaluators,
049                            final EvaluatorManagerFactory evaluatorManagerFactory,
050                            final DriverRestartManager driverRestartManager) {
051    this.evaluators = evaluators;
052    this.evaluatorManagerFactory = evaluatorManagerFactory;
053    this.driverRestartManager = driverRestartManager;
054  }
055
056  @Override
057  public void onNext(final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) {
058    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeat = evaluatorHeartbeatMessage.getMessage();
059    final ReefServiceProtos.EvaluatorStatusProto status = heartbeat.getEvaluatorStatus();
060    final String evaluatorId = status.getEvaluatorId();
061
062    LOG.log(Level.FINEST, "TIME: Begin Heartbeat {0}", evaluatorId);
063    LOG.log(Level.FINEST, "Heartbeat from Evaluator {0} with state {1} timestamp {2} from remoteId {3}",
064        new Object[]{evaluatorId, status.getState(), heartbeat.getTimestamp(),
065            evaluatorHeartbeatMessage.getIdentifier()});
066
067    try {
068      final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId);
069      if (evaluatorManager.isPresent()) {
070        evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
071        return;
072      }
073
074      if (this.evaluators.wasClosed(evaluatorId)) {
075        LOG.log(Level.WARNING, "Evaluator [" + evaluatorId + "] has reported back to the driver after it was closed.");
076        return;
077      }
078
079      if (driverRestartManager.isRestarting() &&
080          driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPECTED) {
081
082        if (this.driverRestartManager.onRecoverEvaluator(evaluatorId)) {
083          LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has reported back to the driver after restart.");
084
085          evaluators.put(recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage));
086        } else {
087          LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has already been recovered.");
088        }
089        return;
090      }
091
092      if (driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPIRED) {
093        LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has reported back to the driver after restart.");
094
095        // Create the evaluator manager, analyze its heartbeat, but don't add it to the set of Evaluators.
096        // Immediately close it.
097        recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage).close();
098        return;
099      }
100
101      final StringBuilder message = new StringBuilder("Contact from unknown Evaluator with identifier '");
102      message.append(evaluatorId);
103      if (heartbeat.hasEvaluatorStatus()) {
104        message.append("' with state '");
105        message.append(status.getState());
106      }
107      message.append('\'');
108      throw new RuntimeException(message.toString());
109    } finally {
110      LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId);
111    }
112  }
113
114  /**
115   * Creates an EvaluatorManager for recovered evaluator.
116   * {@link EvaluatorManager#onEvaluatorHeartbeatMessage(RemoteMessage)} should not
117   * do anything if driver restart period has expired. Expired evaluators should be immediately closed
118   * upon return of this function, while evaluators that have not yet expired should be recorded and added
119   * to the {@link Evaluators} object.
120   */
121  private EvaluatorManager recoverEvaluatorManager(
122      final String evaluatorId,
123      final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) {
124    final EvaluatorManager recoveredEvaluatorManager = evaluatorManagerFactory
125        .getNewEvaluatorManagerForRecoveredEvaluator(
126            driverRestartManager.getResourceRecoverEvent(evaluatorId));
127
128    recoveredEvaluatorManager.onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
129    return recoveredEvaluatorManager;
130  }
131}