This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.resourcemanager;
020
021import org.apache.reef.annotations.audience.Private;
022import org.apache.reef.proto.DriverRuntimeProtocol;
023import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
024import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManagerFactory;
025import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
026import org.apache.reef.util.Optional;
027import org.apache.reef.wake.EventHandler;
028
029import javax.inject.Inject;
030
031/**
032 * A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks
033 * about the current state of a given resource. Ideally, we should think the same thing.
034 */
035@Private
036public final class ResourceStatusHandler implements EventHandler<DriverRuntimeProtocol.ResourceStatusProto> {
037
038  private final Evaluators evaluators;
039  private final EvaluatorManagerFactory evaluatorManagerFactory;
040
041  @Inject
042  ResourceStatusHandler(final Evaluators evaluators, final EvaluatorManagerFactory evaluatorManagerFactory) {
043    this.evaluators = evaluators;
044    this.evaluatorManagerFactory = evaluatorManagerFactory;
045  }
046
047  /**
048   * This resource status message comes from the ResourceManager layer; telling me what it thinks
049   * about the state of the resource executing an Evaluator; This method simply passes the message
050   * off to the referenced EvaluatorManager
051   *
052   * @param resourceStatusProto resource status message from the ResourceManager
053   */
054  @Override
055  public void onNext(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
056    final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusProto.getIdentifier());
057    if (evaluatorManager.isPresent()) {
058      evaluatorManager.get().onResourceStatusMessage(resourceStatusProto);
059    } else {
060      if (resourceStatusProto.getIsFromPreviousDriver()) {
061        EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusProto);
062        previousEvaluatorManager.onResourceStatusMessage(resourceStatusProto);
063      } else {
064        throw new RuntimeException(
065            "Unknown resource status from evaluator " + resourceStatusProto.getIdentifier() +
066                " with state " + resourceStatusProto.getState()
067        );
068      }
069    }
070  }
071}