001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.proto.EvaluatorRuntimeProtocol; 024import org.apache.reef.runtime.common.utils.RemoteManager; 025import org.apache.reef.tang.annotations.Parameter; 026import org.apache.reef.util.Optional; 027import org.apache.reef.wake.EventHandler; 028 029import javax.inject.Inject; 030import java.util.logging.Level; 031import java.util.logging.Logger; 032 033/** 034 * This class handles the sending of Evaluator control messages to the Evaluator. 035 */ 036@DriverSide 037@Private 038public final class EvaluatorControlHandler { 039 040 private static final Logger LOG = Logger.getLogger(EvaluatorControlHandler.class.getName()); 041 private final EvaluatorStatusManager stateManager; 042 private final RemoteManager remoteManager; 043 private final String evaluatorId; 044 private Optional<EventHandler<EvaluatorRuntimeProtocol.EvaluatorControlProto>> wrapped = Optional.empty(); 045 046 /** 047 * @param stateManager used to check whether the Evaluator is running before sending a message. 048 * @param remoteManager used to establish the communications link as soon as the remote ID has been set. 049 */ 050 @Inject 051 EvaluatorControlHandler(final EvaluatorStatusManager stateManager, 052 final RemoteManager remoteManager, 053 @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String evaluatorId) { 054 this.stateManager = stateManager; 055 this.remoteManager = remoteManager; 056 this.evaluatorId = evaluatorId; 057 LOG.log(Level.FINE, "Instantiated 'EvaluatorControlHandler'"); 058 } 059 060 /** 061 * Send the evaluatorControlProto to the Evaluator. 062 * 063 * @param evaluatorControlProto 064 * @throws java.lang.IllegalStateException if the remote ID hasn't been set via setRemoteID() prior to this call 065 * @throws java.lang.IllegalStateException if the Evaluator isn't running. 066 */ 067 public synchronized void send(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) { 068 if (!this.wrapped.isPresent()) { 069 throw new IllegalStateException("Trying to send an EvaluatorControlProto before the Evaluator ID is set."); 070 } 071 if (!this.stateManager.isRunning()) { 072 LOG.log(Level.WARNING, "Trying to send an EvaluatorControlProto to Evaluator [{0}] that is in state [{1}], " + 073 "not [RUNNING]. The control message was: {2}", 074 new Object[]{this.evaluatorId, this.stateManager, evaluatorControlProto}); 075 return; 076 } 077 this.wrapped.get().onNext(evaluatorControlProto); 078 } 079 080 /** 081 * Set the remote ID used to communicate with this Evaluator. 082 * 083 * @param evaluatorRID 084 * @throws java.lang.IllegalStateException if the remote ID has been set before. 085 */ 086 synchronized void setRemoteID(final String evaluatorRID) { 087 if (this.wrapped.isPresent()) { 088 throw new IllegalStateException("Trying to reset the evaluator ID. This isn't supported."); 089 } else { 090 LOG.log(Level.FINE, "Registering remoteId [{0}] for Evaluator [{1}]", new Object[]{evaluatorRID, evaluatorId}); 091 this.wrapped = Optional.of(remoteManager.getHandler(evaluatorRID, 092 EvaluatorRuntimeProtocol.EvaluatorControlProto.class)); 093 } 094 } 095 096}