001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.evaluator; 020 021import org.apache.reef.proto.EvaluatorRuntimeProtocol; 022import org.apache.reef.proto.ReefServiceProtos; 023import org.apache.reef.runtime.common.evaluator.context.ContextManager; 024import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier; 025import org.apache.reef.runtime.common.evaluator.parameters.HeartbeatPeriod; 026import org.apache.reef.runtime.common.utils.RemoteManager; 027import org.apache.reef.tang.InjectionFuture; 028import org.apache.reef.tang.annotations.Parameter; 029import org.apache.reef.tang.annotations.Unit; 030import org.apache.reef.util.Optional; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.time.Clock; 033import org.apache.reef.wake.time.event.Alarm; 034 035import javax.inject.Inject; 036import java.util.ArrayList; 037import java.util.Collection; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041@Unit 042public class HeartBeatManager { 043 044 private static final Logger LOG = Logger.getLogger(HeartBeatManager.class.getName()); 045 046 private final Clock clock; 047 private final int heartbeatPeriod; 048 private final EventHandler<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatHandler; 049 private final InjectionFuture<EvaluatorRuntime> evaluatorRuntime; 050 private final InjectionFuture<ContextManager> contextManager; 051 052 @Inject 053 private HeartBeatManager( 054 final InjectionFuture<EvaluatorRuntime> evaluatorRuntime, 055 final InjectionFuture<ContextManager> contextManager, 056 final Clock clock, 057 final RemoteManager remoteManager, 058 final @Parameter(HeartbeatPeriod.class) int heartbeatPeriod, 059 final @Parameter(DriverRemoteIdentifier.class) String driverRID) { 060 061 this.evaluatorRuntime = evaluatorRuntime; 062 this.contextManager = contextManager; 063 this.clock = clock; 064 this.heartbeatPeriod = heartbeatPeriod; 065 this.evaluatorHeartbeatHandler = remoteManager.getHandler( 066 driverRID, EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class); 067 } 068 069 /** 070 * Assemble a complete new heartbeat and send it out. 071 */ 072 public synchronized void sendHeartbeat() { 073 this.sendHeartBeat(this.getEvaluatorHeartbeatProto()); 074 } 075 076 /** 077 * Called with a specific TaskStatus that must be delivered to the driver. 078 */ 079 public synchronized void sendTaskStatus(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 080 this.sendHeartBeat(this.getEvaluatorHeartbeatProto( 081 this.evaluatorRuntime.get().getEvaluatorStatus(), 082 this.contextManager.get().getContextStatusCollection(), 083 Optional.of(taskStatusProto))); 084 } 085 086 /** 087 * Called with a specific TaskStatus that must be delivered to the driver. 088 */ 089 public synchronized void sendContextStatus( 090 final ReefServiceProtos.ContextStatusProto contextStatusProto) { 091 092 // TODO: Write a test that checks for the order. 093 final Collection<ReefServiceProtos.ContextStatusProto> contextStatusList = new ArrayList<>(); 094 contextStatusList.add(contextStatusProto); 095 contextStatusList.addAll(this.contextManager.get().getContextStatusCollection()); 096 097 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeatProto = 098 this.getEvaluatorHeartbeatProto( 099 this.evaluatorRuntime.get().getEvaluatorStatus(), 100 contextStatusList, Optional.<ReefServiceProtos.TaskStatusProto>empty()); 101 102 this.sendHeartBeat(heartbeatProto); 103 } 104 105 /** 106 * Called with a specific EvaluatorStatus that must be delivered to the driver. 107 */ 108 public synchronized void sendEvaluatorStatus( 109 final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) { 110 this.sendHeartBeat(EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.newBuilder() 111 .setTimestamp(System.currentTimeMillis()) 112 .setEvaluatorStatus(evaluatorStatusProto) 113 .build()); 114 } 115 116 /** 117 * Sends the actual heartbeat out and logs it, so desired. 118 * 119 * @param heartbeatProto 120 */ 121 private synchronized void sendHeartBeat( 122 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeatProto) { 123 if (LOG.isLoggable(Level.FINEST)) { 124 LOG.log(Level.FINEST, "Heartbeat message:\n" + heartbeatProto, new Exception("Stack trace")); 125 } 126 this.evaluatorHeartbeatHandler.onNext(heartbeatProto); 127 } 128 129 130 private EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto getEvaluatorHeartbeatProto() { 131 return this.getEvaluatorHeartbeatProto( 132 this.evaluatorRuntime.get().getEvaluatorStatus(), 133 this.contextManager.get().getContextStatusCollection(), 134 this.contextManager.get().getTaskStatus()); 135 } 136 137 private final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto getEvaluatorHeartbeatProto( 138 final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto, 139 final Iterable<ReefServiceProtos.ContextStatusProto> contextStatusProtos, 140 final Optional<ReefServiceProtos.TaskStatusProto> taskStatusProto) { 141 142 final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.Builder builder = 143 EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.newBuilder() 144 .setTimestamp(System.currentTimeMillis()) 145 .setEvaluatorStatus(evaluatorStatusProto); 146 147 for (final ReefServiceProtos.ContextStatusProto contextStatusProto : contextStatusProtos) { 148 builder.addContextStatus(contextStatusProto); 149 } 150 151 if (taskStatusProto.isPresent()) { 152 builder.setTaskStatus(taskStatusProto.get()); 153 } 154 155 return builder.build(); 156 } 157 158 final class HeartbeatAlarmHandler implements EventHandler<Alarm> { 159 @Override 160 public void onNext(final Alarm alarm) { 161 synchronized (HeartBeatManager.this) { 162 if (evaluatorRuntime.get().isRunning()) { 163 HeartBeatManager.this.sendHeartbeat(); 164 HeartBeatManager.this.clock.scheduleAlarm(HeartBeatManager.this.heartbeatPeriod, this); 165 } else { 166 LOG.log(Level.FINEST, 167 "Not triggering a heartbeat, because state is: {0}", 168 evaluatorRuntime.get().getState()); 169 } 170 } 171 } 172 } 173}