001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.restart.DriverRestartManager; 024import org.apache.reef.driver.restart.EvaluatorRestartState; 025import org.apache.reef.proto.EvaluatorRuntimeProtocol; 026import org.apache.reef.proto.ReefServiceProtos; 027import org.apache.reef.util.Optional; 028import org.apache.reef.wake.EventHandler; 029import org.apache.reef.wake.remote.RemoteMessage; 030 031import javax.inject.Inject; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Receives heartbeats from all Evaluators and dispatches them to the right EvaluatorManager instance. 037 */ 038@Private 039@DriverSide 040public final class EvaluatorHeartbeatHandler 041 implements EventHandler<RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto>> { 042 private static final Logger LOG = Logger.getLogger(EvaluatorHeartbeatHandler.class.getName()); 043 private final Evaluators evaluators; 044 private final EvaluatorManagerFactory evaluatorManagerFactory; 045 private final DriverRestartManager driverRestartManager; 046 047 @Inject 048 EvaluatorHeartbeatHandler(final Evaluators evaluators, 049 final EvaluatorManagerFactory evaluatorManagerFactory, 050 final DriverRestartManager driverRestartManager) { 051 this.evaluators = evaluators; 052 this.evaluatorManagerFactory = evaluatorManagerFactory; 053 this.driverRestartManager = driverRestartManager; 054 } 055 056 @Override 057 public void onNext(final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) { 058 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeat = evaluatorHeartbeatMessage.getMessage(); 059 final ReefServiceProtos.EvaluatorStatusProto status = heartbeat.getEvaluatorStatus(); 060 final String evaluatorId = status.getEvaluatorId(); 061 062 LOG.log(Level.FINEST, "TIME: Begin Heartbeat {0}", evaluatorId); 063 LOG.log(Level.FINEST, "Heartbeat from Evaluator {0} with state {1} timestamp {2} from remoteId {3}", 064 new Object[]{evaluatorId, status.getState(), heartbeat.getTimestamp(), 065 evaluatorHeartbeatMessage.getIdentifier()}); 066 067 try { 068 final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId); 069 if (evaluatorManager.isPresent()) { 070 evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage); 071 return; 072 } 073 074 if (this.evaluators.wasClosed(evaluatorId)) { 075 LOG.log(Level.WARNING, "Evaluator [" + evaluatorId + "] has reported back to the driver after it was closed."); 076 return; 077 } 078 079 if (driverRestartManager.isRestarting() && 080 driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPECTED) { 081 082 if (this.driverRestartManager.onRecoverEvaluator(evaluatorId)) { 083 LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has reported back to the driver after restart."); 084 085 evaluators.put(recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage)); 086 } else { 087 LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has already been recovered."); 088 } 089 return; 090 } 091 092 if (driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPIRED) { 093 LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has reported back to the driver after restart."); 094 095 // Create the evaluator manager, analyze its heartbeat, but don't add it to the set of Evaluators. 096 // Immediately close it. 097 recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage).close(); 098 return; 099 } 100 101 final StringBuilder message = new StringBuilder("Contact from unknown Evaluator with identifier '"); 102 message.append(evaluatorId); 103 if (heartbeat.hasEvaluatorStatus()) { 104 message.append("' with state '"); 105 message.append(status.getState()); 106 } 107 message.append('\''); 108 throw new RuntimeException(message.toString()); 109 } finally { 110 LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId); 111 } 112 } 113 114 /** 115 * Creates an EvaluatorManager for recovered evaluator. 116 * {@link EvaluatorManager#onEvaluatorHeartbeatMessage(RemoteMessage)} should not 117 * do anything if driver restart period has expired. Expired evaluators should be immediately closed 118 * upon return of this function, while evaluators that have not yet expired should be recorded and added 119 * to the {@link Evaluators} object. 120 */ 121 private EvaluatorManager recoverEvaluatorManager( 122 final String evaluatorId, 123 final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) { 124 final EvaluatorManager recoveredEvaluatorManager = evaluatorManagerFactory 125 .getNewEvaluatorManagerForRecoveredEvaluator( 126 driverRestartManager.getResourceRecoverEvent(evaluatorId)); 127 128 recoveredEvaluatorManager.onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage); 129 return recoveredEvaluatorManager; 130 } 131}