001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.proto.EvaluatorRuntimeProtocol; 024import org.apache.reef.runtime.common.utils.RemoteManager; 025import org.apache.reef.tang.annotations.Parameter; 026import org.apache.reef.util.Optional; 027import org.apache.reef.wake.EventHandler; 028 029import javax.inject.Inject; 030import java.util.logging.Level; 031import java.util.logging.Logger; 032 033/** 034 * This class handles the sending of Evaluator control messages to the Evaluator. 035 */ 036@DriverSide 037@Private 038public final class EvaluatorControlHandler { 039 040 private static Logger LOG = Logger.getLogger(EvaluatorControlHandler.class.getName()); 041 private final EvaluatorStatusManager stateManager; 042 private final RemoteManager remoteManager; 043 private final String evaluatorId; 044 private Optional<EventHandler<EvaluatorRuntimeProtocol.EvaluatorControlProto>> wrapped = Optional.empty(); 045 046 /** 047 * @param stateManager used to check whether the Evaluator is running before sending a message. 048 * @param remoteManager used to establish the communications link as soon as the remote ID has been set. 049 */ 050 @Inject 051 EvaluatorControlHandler(final EvaluatorStatusManager stateManager, 052 final RemoteManager remoteManager, 053 final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorId) { 054 this.stateManager = stateManager; 055 this.remoteManager = remoteManager; 056 this.evaluatorId = evaluatorId; 057 LOG.log(Level.FINE, "Instantiated 'EvaluatorControlHandler'"); 058 } 059 060 /** 061 * Send the evaluatorControlProto to the Evaluator. 062 * 063 * @param evaluatorControlProto 064 * @throws java.lang.IllegalStateException if the remote ID hasn't been set via setRemoteID() prior to this call 065 * @throws java.lang.IllegalStateException if the Evaluator isn't running. 066 */ 067 public synchronized void send(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) { 068 if (!this.wrapped.isPresent()) { 069 throw new IllegalStateException("Trying to send an EvaluatorControlProto before the Evaluator ID is set."); 070 } 071 if (!this.stateManager.isRunning()) { 072 final String msg = new StringBuilder() 073 .append("Trying to send an EvaluatorControlProto to Evaluator [") 074 .append(this.evaluatorId) 075 .append("] that is in state [") 076 .append(this.stateManager.toString()) 077 .append("], not [RUNNING]. The control message was: ") 078 .append(evaluatorControlProto.toString()) 079 .toString(); 080 throw new IllegalStateException(msg); 081 } 082 this.wrapped.get().onNext(evaluatorControlProto); 083 } 084 085 /** 086 * Set the remote ID used to communicate with this Evaluator. 087 * 088 * @param evaluatorRID 089 * @throws java.lang.IllegalStateException if the remote ID has been set before. 090 */ 091 synchronized void setRemoteID(final String evaluatorRID) { 092 if (this.wrapped.isPresent()) { 093 throw new IllegalStateException("Trying to reset the evaluator ID. This isn't supported."); 094 } else { 095 LOG.log(Level.FINE, "Registering remoteId [{0}] for Evaluator [{1}]", new Object[]{evaluatorRID, evaluatorId}); 096 this.wrapped = Optional.of(remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class)); 097 } 098 } 099 100}