001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.proto.EvaluatorRuntimeProtocol;
024import org.apache.reef.runtime.common.utils.RemoteManager;
025import org.apache.reef.tang.annotations.Parameter;
026import org.apache.reef.util.Optional;
027import org.apache.reef.wake.EventHandler;
028
029import javax.inject.Inject;
030import java.util.logging.Level;
031import java.util.logging.Logger;
032
033/**
034 * This class handles the sending of Evaluator control messages to the Evaluator.
035 */
036@DriverSide
037@Private
038public final class EvaluatorControlHandler {
039
040  private static final Logger LOG = Logger.getLogger(EvaluatorControlHandler.class.getName());
041  private final EvaluatorStatusManager stateManager;
042  private final RemoteManager remoteManager;
043  private final String evaluatorId;
044  private Optional<EventHandler<EvaluatorRuntimeProtocol.EvaluatorControlProto>> wrapped = Optional.empty();
045
046  /**
047   * @param stateManager  used to check whether the Evaluator is running before sending a message.
048   * @param remoteManager used to establish the communications link as soon as the remote ID has been set.
049   */
050  @Inject
051  EvaluatorControlHandler(final EvaluatorStatusManager stateManager,
052                          final RemoteManager remoteManager,
053                          @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String evaluatorId) {
054    this.stateManager = stateManager;
055    this.remoteManager = remoteManager;
056    this.evaluatorId = evaluatorId;
057    LOG.log(Level.FINE, "Instantiated 'EvaluatorControlHandler'");
058  }
059
060  /**
061   * Send the evaluatorControlProto to the Evaluator.
062   *
063   * @param evaluatorControlProto
064   * @throws java.lang.IllegalStateException if the remote ID hasn't been set via setRemoteID() prior to this call
065   * @throws java.lang.IllegalStateException if the Evaluator isn't running.
066   */
067  public synchronized void send(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
068    if (!this.wrapped.isPresent()) {
069      throw new IllegalStateException("Trying to send an EvaluatorControlProto before the Evaluator ID is set.");
070    }
071    if (!this.stateManager.isRunning()) {
072      LOG.log(Level.WARNING, "Trying to send an EvaluatorControlProto to Evaluator [{0}] that is in state [{1}], " +
073              "not [RUNNING]. The control message was: {2}",
074              new Object[]{this.evaluatorId, this.stateManager, evaluatorControlProto});
075      return;
076    }
077    this.wrapped.get().onNext(evaluatorControlProto);
078  }
079
080  /**
081   * Set the remote ID used to communicate with this Evaluator.
082   *
083   * @param evaluatorRID
084   * @throws java.lang.IllegalStateException if the remote ID has been set before.
085   */
086  synchronized void setRemoteID(final String evaluatorRID) {
087    if (this.wrapped.isPresent()) {
088      throw new IllegalStateException("Trying to reset the evaluator ID. This isn't supported.");
089    } else {
090      LOG.log(Level.FINE, "Registering remoteId [{0}] for Evaluator [{1}]", new Object[]{evaluatorRID, evaluatorId});
091      this.wrapped = Optional.of(remoteManager.getHandler(evaluatorRID,
092          EvaluatorRuntimeProtocol.EvaluatorControlProto.class));
093    }
094  }
095
096}