001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.evaluator; 020 021import org.apache.reef.proto.EvaluatorRuntimeProtocol; 022import org.apache.reef.proto.ReefServiceProtos; 023import org.apache.reef.runtime.common.evaluator.context.ContextManager; 024import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier; 025import org.apache.reef.runtime.common.evaluator.parameters.HeartbeatPeriod; 026import org.apache.reef.runtime.common.utils.RemoteManager; 027import org.apache.reef.tang.InjectionFuture; 028import org.apache.reef.tang.annotations.Parameter; 029import org.apache.reef.tang.annotations.Unit; 030import org.apache.reef.util.Optional; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.time.Clock; 033import org.apache.reef.wake.time.event.Alarm; 034 035import javax.inject.Inject; 036import java.util.ArrayList; 037import java.util.Collection; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Heartbeat manager. 043 */ 044@Unit 045public final class HeartBeatManager { 046 047 private static final Logger LOG = Logger.getLogger(HeartBeatManager.class.getName()); 048 049 private final Clock clock; 050 private final int heartbeatPeriod; 051 private final EventHandler<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatHandler; 052 private final InjectionFuture<EvaluatorRuntime> evaluatorRuntime; 053 private final InjectionFuture<ContextManager> contextManager; 054 055 @Inject 056 private HeartBeatManager( 057 final InjectionFuture<EvaluatorRuntime> evaluatorRuntime, 058 final InjectionFuture<ContextManager> contextManager, 059 final Clock clock, 060 final RemoteManager remoteManager, 061 @Parameter(HeartbeatPeriod.class) final int heartbeatPeriod, 062 @Parameter(DriverRemoteIdentifier.class) final String driverRID) { 063 064 this.evaluatorRuntime = evaluatorRuntime; 065 this.contextManager = contextManager; 066 this.clock = clock; 067 this.heartbeatPeriod = heartbeatPeriod; 068 this.evaluatorHeartbeatHandler = remoteManager.getHandler( 069 driverRID, EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class); 070 } 071 072 /** 073 * Assemble a complete new heartbeat and send it out. 074 */ 075 public synchronized void sendHeartbeat() { 076 this.sendHeartBeat(this.getEvaluatorHeartbeatProto()); 077 } 078 079 /** 080 * Called with a specific TaskStatus that must be delivered to the driver. 081 */ 082 public synchronized void sendTaskStatus(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 083 this.sendHeartBeat(this.getEvaluatorHeartbeatProto( 084 this.evaluatorRuntime.get().getEvaluatorStatus(), 085 this.contextManager.get().getContextStatusCollection(), 086 Optional.of(taskStatusProto))); 087 } 088 089 /** 090 * Called with a specific ContextStatus that must be delivered to the driver. 091 */ 092 public synchronized void sendContextStatus( 093 final ReefServiceProtos.ContextStatusProto contextStatusProto) { 094 095 // TODO[JIRA REEF-833]: Write a test that verifies correct order of heartbeats. 096 final Collection<ReefServiceProtos.ContextStatusProto> contextStatusList = new ArrayList<>(); 097 contextStatusList.add(contextStatusProto); 098 contextStatusList.addAll(this.contextManager.get().getContextStatusCollection()); 099 100 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeatProto = 101 this.getEvaluatorHeartbeatProto( 102 this.evaluatorRuntime.get().getEvaluatorStatus(), 103 contextStatusList, Optional.<ReefServiceProtos.TaskStatusProto>empty()); 104 105 this.sendHeartBeat(heartbeatProto); 106 } 107 108 /** 109 * Called with a specific EvaluatorStatus that must be delivered to the driver. 110 */ 111 public synchronized void sendEvaluatorStatus( 112 final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) { 113 this.sendHeartBeat(EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.newBuilder() 114 .setTimestamp(System.currentTimeMillis()) 115 .setEvaluatorStatus(evaluatorStatusProto) 116 .build()); 117 } 118 119 /** 120 * Sends the actual heartbeat out and logs it, if so desired. 121 * 122 * @param heartbeatProto 123 */ 124 private synchronized void sendHeartBeat( 125 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeatProto) { 126 if (LOG.isLoggable(Level.FINEST)) { 127 LOG.log(Level.FINEST, "Heartbeat message:\n" + heartbeatProto, new Exception("Stack trace")); 128 } 129 this.evaluatorHeartbeatHandler.onNext(heartbeatProto); 130 } 131 132 133 private EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto getEvaluatorHeartbeatProto() { 134 return this.getEvaluatorHeartbeatProto( 135 this.evaluatorRuntime.get().getEvaluatorStatus(), 136 this.contextManager.get().getContextStatusCollection(), 137 this.contextManager.get().getTaskStatus()); 138 } 139 140 private EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto getEvaluatorHeartbeatProto( 141 final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto, 142 final Iterable<ReefServiceProtos.ContextStatusProto> contextStatusProtos, 143 final Optional<ReefServiceProtos.TaskStatusProto> taskStatusProto) { 144 145 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.Builder builder = 146 EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.newBuilder() 147 .setTimestamp(System.currentTimeMillis()) 148 .setEvaluatorStatus(evaluatorStatusProto); 149 150 for (final ReefServiceProtos.ContextStatusProto contextStatusProto : contextStatusProtos) { 151 builder.addContextStatus(contextStatusProto); 152 } 153 154 if (taskStatusProto.isPresent()) { 155 builder.setTaskStatus(taskStatusProto.get()); 156 } 157 158 return builder.build(); 159 } 160 161 final class HeartbeatAlarmHandler implements EventHandler<Alarm> { 162 @Override 163 public void onNext(final Alarm alarm) { 164 synchronized (HeartBeatManager.this) { 165 if (evaluatorRuntime.get().isRunning()) { 166 HeartBeatManager.this.sendHeartbeat(); 167 HeartBeatManager.this.clock.scheduleAlarm(HeartBeatManager.this.heartbeatPeriod, this); 168 } else { 169 LOG.log(Level.FINEST, 170 "Not triggering a heartbeat, because state is: {0}", 171 evaluatorRuntime.get().getState()); 172 } 173 } 174 } 175 } 176}