001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.restart.DriverRestartManager; 024import org.apache.reef.driver.restart.EvaluatorRestartState; 025import org.apache.reef.proto.EvaluatorRuntimeProtocol; 026import org.apache.reef.proto.ReefServiceProtos; 027import org.apache.reef.util.Optional; 028import org.apache.reef.wake.EventHandler; 029import org.apache.reef.wake.remote.RemoteMessage; 030 031import javax.inject.Inject; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Receives heartbeats from all Evaluators and dispatches them to the right EvaluatorManager instance. 037 */ 038@Private 039@DriverSide 040public final class EvaluatorHeartbeatHandler 041 implements EventHandler<RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto>> { 042 private static final Logger LOG = Logger.getLogger(EvaluatorHeartbeatHandler.class.getName()); 043 private final Evaluators evaluators; 044 private final EvaluatorManagerFactory evaluatorManagerFactory; 045 private final DriverRestartManager driverRestartManager; 046 047 @Inject 048 EvaluatorHeartbeatHandler(final Evaluators evaluators, 049 final EvaluatorManagerFactory evaluatorManagerFactory, 050 final DriverRestartManager driverRestartManager) { 051 this.evaluators = evaluators; 052 this.evaluatorManagerFactory = evaluatorManagerFactory; 053 this.driverRestartManager = driverRestartManager; 054 } 055 056 @Override 057 public void onNext(final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) { 058 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeat = evaluatorHeartbeatMessage.getMessage(); 059 final ReefServiceProtos.EvaluatorStatusProto status = heartbeat.getEvaluatorStatus(); 060 final String evaluatorId = status.getEvaluatorId(); 061 062 LOG.log(Level.FINEST, "TIME: Begin Heartbeat {0}", evaluatorId); 063 LOG.log(Level.FINEST, "Heartbeat from Evaluator {0} with state {1} timestamp {2} from remoteId {3}", 064 new Object[]{evaluatorId, status.getState(), heartbeat.getTimestamp(), 065 evaluatorHeartbeatMessage.getIdentifier()}); 066 067 try { 068 final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId); 069 if (evaluatorManager.isPresent()) { 070 evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage); 071 return; 072 } 073 074 if (driverRestartManager.isRestarting() && 075 driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPECTED) { 076 077 if (this.driverRestartManager.onRecoverEvaluator(evaluatorId)) { 078 LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has reported back to the driver after restart."); 079 080 evaluators.put(recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage)); 081 } else { 082 LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has already been recovered."); 083 } 084 return; 085 } 086 087 if (driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPIRED) { 088 LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has reported back to the driver after restart."); 089 090 // Create the evaluator manager, analyze its heartbeat, but don't add it to the set of Evaluators. 091 // Immediately close it. 092 recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage).close(); 093 return; 094 } 095 096 final StringBuilder message = new StringBuilder("Contact from unknown Evaluator with identifier '"); 097 message.append(evaluatorId); 098 if (heartbeat.hasEvaluatorStatus()) { 099 message.append("' with state '"); 100 message.append(status.getState()); 101 } 102 message.append('\''); 103 throw new RuntimeException(message.toString()); 104 } finally { 105 LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId); 106 } 107 } 108 109 /** 110 * Creates an EvaluatorManager for recovered evaluator. 111 * {@link EvaluatorManager#onEvaluatorHeartbeatMessage(RemoteMessage)} should not 112 * do anything if driver restart period has expired. Expired evaluators should be immediately closed 113 * upon return of this function, while evaluators that have not yet expired should be recorded and added 114 * to the {@link Evaluators} object. 115 */ 116 private EvaluatorManager recoverEvaluatorManager( 117 final String evaluatorId, 118 final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) { 119 final EvaluatorManager recoveredEvaluatorManager = evaluatorManagerFactory 120 .getNewEvaluatorManagerForRecoveredEvaluator( 121 driverRestartManager.getResourceRecoverEvent(evaluatorId)); 122 123 recoveredEvaluatorManager.onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage); 124 return recoveredEvaluatorManager; 125 } 126}