This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
024import org.apache.reef.util.Optional;
025import org.apache.reef.util.SingletonAsserter;
026import org.apache.reef.tang.util.MonotonicSet;
027
028import javax.inject.Inject;
029import java.util.*;
030import java.util.logging.Level;
031import java.util.logging.Logger;
032
033/**
034 * Manages all Evaluators.
035 * See EvaluatorManager for the Driver side representation of a single Evaluator.
036 */
037@DriverSide
038@Private
039public final class Evaluators implements AutoCloseable {
040
041  private static final Logger LOG = Logger.getLogger(Evaluators.class.getName());
042
043  /**
044   * A map between evaluatorId and the EvaluatorManager that handles this evaluator.
045   */
046  private final Map<String, EvaluatorManager> evaluators = new HashMap<>();
047
048  /**
049   * A set of evaluatorIds for "closed" (failed and returned) evaluators.
050   */
051  private final MonotonicSet<String> closedEvaluatorIds = new MonotonicSet<>();
052
053  @Inject
054  Evaluators() {
055    LOG.log(Level.FINE, "Instantiated 'Evaluators'");
056    assert SingletonAsserter.assertSingleton(Evaluators.class);
057  }
058
059  /**
060   * Closes all EvaluatorManager instances managed.
061   */
062  @Override
063  public void close() {
064    final List<EvaluatorManager> evaluatorsCopy;
065    synchronized (this) {
066      evaluatorsCopy = new ArrayList<>(this.evaluators.values());
067    }
068    for (final EvaluatorManager evaluatorManager : evaluatorsCopy) {
069      LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId());
070      if (!evaluatorManager.isClosed()) {
071        evaluatorManager.close();
072      }
073    }
074  }
075
076  /**
077   * Return true if <em>all</em> evaluators are in closed state
078   * (and their processing queues are empty).
079   */
080  public synchronized boolean allEvaluatorsAreClosed() {
081    synchronized (this.evaluators) {
082      for (final EvaluatorManager eval : this.evaluators.values()) {
083        if (!eval.isClosed()) {
084          return false;
085        }
086      }
087    }
088    return true;
089  }
090
091  /**
092   * @param evaluatorId
093   * @return the EvaluatorManager for the given id, if one exists.
094   */
095  public synchronized Optional<EvaluatorManager> get(final String evaluatorId) {
096    return Optional.ofNullable(this.evaluators.get(evaluatorId));
097  }
098
099  /**
100   * @param evaluatorId
101   * @return true if evaluator with this id has already been closed.
102   */
103  public synchronized boolean wasClosed(final String evaluatorId) {
104    return this.closedEvaluatorIds.contains(evaluatorId);
105  }
106
107  /**
108   * Create new EvaluatorManager and add it to the collection.
109   * <p>
110   * FIXME: This method is a temporary fix for the race condition
111   * described in issues #828 and #839.
112   *
113   * @param evaluatorManagerFactory Factory that builds new EvaluatorManager objects.
114   * @param evaluatorMsg            Resource allocation message that contains data on the new evaluator.
115   * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
116   */
117  public synchronized void put(
118      final EvaluatorManagerFactory evaluatorManagerFactory,
119      final ResourceAllocationEvent evaluatorMsg) {
120    this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg));
121  }
122
123  /**
124   * Adds an EvaluatorManager.
125   *
126   * @param evaluatorManager
127   * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
128   */
129  public synchronized void put(final EvaluatorManager evaluatorManager) {
130    final String evaluatorId = evaluatorManager.getId();
131    if (this.wasClosed(evaluatorId)) {
132      throw new IllegalArgumentException(
133        "Trying to re-add an Evaluator that has already been closed: " + evaluatorId);
134    }
135    final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager);
136    LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev});
137    if (prev != null) {
138      throw new IllegalArgumentException(
139        "Trying to re-add an Evaluator that is already known: " + evaluatorId);
140    }
141  }
142
143  /**
144   * Moves evaluator from map of active evaluators to set of closed evaluators.
145   */
146  public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) {
147    final String evaluatorId = evaluatorManager.getId();
148    if (!evaluatorManager.isClosed()) {
149      throw new IllegalArgumentException("Trying to remove evaluator " + evaluatorId + " which is not closed yet.");
150    }
151    if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) {
152      throw new IllegalArgumentException("Trying to remove unknown evaluator " + evaluatorId + ".");
153    }
154    if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) {
155      LOG.log(Level.FINE, "Trying to remove closed evaluator " + evaluatorId + " which has already been removed.");
156    } else {
157      LOG.log(Level.FINE, "Removing closed evaluator " + evaluatorId + ".");
158      this.evaluators.remove(evaluatorId);
159      this.closedEvaluatorIds.add(evaluatorId);
160    }
161  }
162}