This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
024import org.apache.reef.util.Optional;
025import org.apache.reef.util.SingletonAsserter;
026
027import javax.inject.Inject;
028import java.util.ArrayList;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Manages all Evaluators.
037 * See EvaluatorManager for the Driver side representation of a single Evaluator.
038 */
039@DriverSide
040@Private
041public final class Evaluators implements AutoCloseable {
042
043  private static final Logger LOG = Logger.getLogger(Evaluators.class.getName());
044
045  /**
046   * A map between evaluatorId and the EvaluatorManager that handles this evaluator.
047   */
048  private final Map<String, EvaluatorManager> evaluators = new HashMap<>();
049
050
051  @Inject
052  Evaluators() {
053    LOG.log(Level.FINE, "Instantiated 'Evaluators'");
054    assert SingletonAsserter.assertSingleton(Evaluators.class);
055  }
056
057  /**
058   * Closes all EvaluatorManager instances managed.
059   */
060  @Override
061  public void close() {
062    final List<EvaluatorManager> evaluatorsCopy;
063    synchronized (this) {
064      evaluatorsCopy = new ArrayList<>(this.evaluators.values());
065    }
066    for (final EvaluatorManager evaluatorManager : evaluatorsCopy) {
067      LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId());
068      if (!evaluatorManager.isClosed()) {
069        evaluatorManager.close();
070      }
071    }
072  }
073
074  /**
075   * Return true if <em>all</em> evaluators are in closed state
076   * (and their processing queues are empty).
077   */
078  public synchronized boolean allEvaluatorsAreClosed() {
079    synchronized (this.evaluators) {
080      for (final EvaluatorManager eval : this.evaluators.values()) {
081        if (!eval.isClosed()) {
082          return false;
083        }
084      }
085    }
086    return true;
087  }
088
089  /**
090   * @param evaluatorId
091   * @return the EvaluatorManager for the given id, if one exists.
092   */
093  public synchronized Optional<EvaluatorManager> get(final String evaluatorId) {
094    return Optional.ofNullable(this.evaluators.get(evaluatorId));
095  }
096
097  /**
098   * Create new EvaluatorManager and add it to the collection.
099   * <p>
100   * FIXME: This method is a temporary fix for the race condition
101   * described in issues #828 and #839.
102   *
103   * @param evaluatorManagerFactory Factory that builds new EvaluatorManager objects.
104   * @param evaluatorMsg            Resource allocation message that contains data on the new evaluator.
105   * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
106   */
107  public synchronized void put(
108      final EvaluatorManagerFactory evaluatorManagerFactory,
109      final ResourceAllocationEvent evaluatorMsg) {
110    this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg));
111  }
112
113  /**
114   * Adds an EvaluatorManager.
115   *
116   * @param evaluatorManager
117   * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
118   */
119  public synchronized void put(final EvaluatorManager evaluatorManager) {
120    final String evaluatorId = evaluatorManager.getId();
121    final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager);
122    LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev});
123    if (prev != null) {
124      throw new IllegalArgumentException(
125          "Trying to re-add an Evaluator that is already known: " + evaluatorId);
126    }
127  }
128}