This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.proto.EvaluatorRuntimeProtocol;
024import org.apache.reef.proto.ReefServiceProtos;
025import org.apache.reef.util.Optional;
026import org.apache.reef.wake.EventHandler;
027import org.apache.reef.wake.remote.RemoteMessage;
028
029import javax.inject.Inject;
030import java.util.logging.Level;
031import java.util.logging.Logger;
032
033/**
034 * Receives heartbeats from all Evaluators and dispatches them to the right EvaluatorManager instance.
035 */
036@Private
037@DriverSide
038public final class EvaluatorHeartbeatHandler implements EventHandler<RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto>> {
039  private static final Logger LOG = Logger.getLogger(EvaluatorHeartbeatHandler.class.getName());
040  private final Evaluators evaluators;
041  private final EvaluatorManagerFactory evaluatorManagerFactory;
042
043  @Inject
044  EvaluatorHeartbeatHandler(final Evaluators evaluators, final EvaluatorManagerFactory evaluatorManagerFactory) {
045    this.evaluators = evaluators;
046    this.evaluatorManagerFactory = evaluatorManagerFactory;
047  }
048
049  @Override
050  public void onNext(final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) {
051    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeat = evaluatorHeartbeatMessage.getMessage();
052    final ReefServiceProtos.EvaluatorStatusProto status = heartbeat.getEvaluatorStatus();
053    final String evaluatorId = status.getEvaluatorId();
054
055    LOG.log(Level.FINEST, "TIME: Begin Heartbeat {0}", evaluatorId);
056    LOG.log(Level.FINEST, "Heartbeat from Evaluator {0} with state {1} timestamp {2} from remoteId {3}",
057        new Object[]{evaluatorId, status.getState(), heartbeat.getTimestamp(), evaluatorHeartbeatMessage.getIdentifier()});
058
059    final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId);
060    if (evaluatorManager.isPresent()) {
061      evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
062    } else {
063      final StringBuilder message = new StringBuilder("Contact from unknown Evaluator with identifier '");
064      message.append(evaluatorId);
065      if (heartbeat.hasEvaluatorStatus()) {
066        message.append("' with state '");
067        message.append(status.getState());
068      }
069      message.append('\'');
070      throw new RuntimeException(message.toString());
071    }
072    LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId);
073  }
074}