This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.restart.DriverRestartManager;
024import org.apache.reef.driver.restart.EvaluatorRestartState;
025import org.apache.reef.proto.EvaluatorRuntimeProtocol;
026import org.apache.reef.proto.ReefServiceProtos;
027import org.apache.reef.util.Optional;
028import org.apache.reef.wake.EventHandler;
029import org.apache.reef.wake.remote.RemoteMessage;
030
031import javax.inject.Inject;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Receives heartbeats from all Evaluators and dispatches them to the right EvaluatorManager instance.
037 */
038@Private
039@DriverSide
040public final class EvaluatorHeartbeatHandler
041    implements EventHandler<RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto>> {
042  private static final Logger LOG = Logger.getLogger(EvaluatorHeartbeatHandler.class.getName());
043  private final Evaluators evaluators;
044  private final EvaluatorManagerFactory evaluatorManagerFactory;
045  private final DriverRestartManager driverRestartManager;
046
047  @Inject
048  EvaluatorHeartbeatHandler(final Evaluators evaluators,
049                            final EvaluatorManagerFactory evaluatorManagerFactory,
050                            final DriverRestartManager driverRestartManager) {
051    this.evaluators = evaluators;
052    this.evaluatorManagerFactory = evaluatorManagerFactory;
053    this.driverRestartManager = driverRestartManager;
054  }
055
056  @Override
057  public void onNext(final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) {
058    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeat = evaluatorHeartbeatMessage.getMessage();
059    final ReefServiceProtos.EvaluatorStatusProto status = heartbeat.getEvaluatorStatus();
060    final String evaluatorId = status.getEvaluatorId();
061
062    LOG.log(Level.FINEST, "TIME: Begin Heartbeat {0}", evaluatorId);
063    LOG.log(Level.FINEST, "Heartbeat from Evaluator {0} with state {1} timestamp {2} from remoteId {3}",
064        new Object[]{evaluatorId, status.getState(), heartbeat.getTimestamp(),
065            evaluatorHeartbeatMessage.getIdentifier()});
066
067    try {
068      final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId);
069      if (evaluatorManager.isPresent()) {
070        evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
071        return;
072      }
073
074      if (driverRestartManager.isRestarting() &&
075          driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPECTED) {
076
077        if (this.driverRestartManager.onRecoverEvaluator(evaluatorId)) {
078          LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has reported back to the driver after restart.");
079
080          evaluators.put(recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage));
081        } else {
082          LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has already been recovered.");
083        }
084        return;
085      }
086
087      if (driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPIRED) {
088        LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has reported back to the driver after restart.");
089
090        // Create the evaluator manager, analyze its heartbeat, but don't add it to the set of Evaluators.
091        // Immediately close it.
092        recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage).close();
093        return;
094      }
095
096      final StringBuilder message = new StringBuilder("Contact from unknown Evaluator with identifier '");
097      message.append(evaluatorId);
098      if (heartbeat.hasEvaluatorStatus()) {
099        message.append("' with state '");
100        message.append(status.getState());
101      }
102      message.append('\'');
103      throw new RuntimeException(message.toString());
104    } finally {
105      LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId);
106    }
107  }
108
109  /**
110   * Creates an EvaluatorManager for recovered evaluator.
111   * {@link EvaluatorManager#onEvaluatorHeartbeatMessage(RemoteMessage)} should not
112   * do anything if driver restart period has expired. Expired evaluators should be immediately closed
113   * upon return of this function, while evaluators that have not yet expired should be recorded and added
114   * to the {@link Evaluators} object.
115   */
116  private EvaluatorManager recoverEvaluatorManager(
117      final String evaluatorId,
118      final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) {
119    final EvaluatorManager recoveredEvaluatorManager = evaluatorManagerFactory
120        .getNewEvaluatorManagerForRecoveredEvaluator(
121            driverRestartManager.getResourceRecoverEvent(evaluatorId));
122
123    recoveredEvaluatorManager.onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
124    return recoveredEvaluatorManager;
125  }
126}