This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.parameters.DriverIdentifier;
024import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
025import org.apache.reef.tang.annotations.Parameter;
026import org.apache.reef.util.Optional;
027import org.apache.reef.tang.util.MonotonicSet;
028import org.apache.reef.util.SingletonAsserter;
029
030import javax.inject.Inject;
031import java.util.*;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Manages all Evaluators.
037 * See EvaluatorManager for the Driver side representation of a single Evaluator.
038 */
039@DriverSide
040@Private
041public final class Evaluators implements AutoCloseable {
042
043  private static final Logger LOG = Logger.getLogger(Evaluators.class.getName());
044
045  /**
046   * A map between evaluatorId and the EvaluatorManager that handles this evaluator.
047   */
048  private final Map<String, EvaluatorManager> evaluators = new HashMap<>();
049
050  /**
051   * A set of evaluatorIds for "closed" (failed and returned) evaluators.
052   */
053  private final MonotonicSet<String> closedEvaluatorIds = new MonotonicSet<>();
054
055  @Inject
056  private Evaluators(@Parameter(DriverIdentifier.class) final String driverId) {
057    LOG.log(Level.FINE, "Instantiated 'Evaluators' for driver {0}", driverId);
058    // There can be several instances of the class for multiple REEFEnvironments.
059    // It is still a singleton when REEF Driver owns the entire JVM.
060    assert SingletonAsserter.assertSingleton(driverId, Evaluators.class);
061  }
062
063  /**
064   * Closes all EvaluatorManager instances managed.
065   */
066  @Override
067  public void close() {
068
069    LOG.log(Level.FINER, "Closing the evaluators - begin");
070
071    final List<EvaluatorManager> evaluatorsCopy;
072    synchronized (this) {
073      evaluatorsCopy = new ArrayList<>(this.evaluators.values());
074    }
075
076    for (final EvaluatorManager evaluatorManager : evaluatorsCopy) {
077      if (!evaluatorManager.isClosedOrClosing()) {
078        LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId());
079        evaluatorManager.close();
080      }
081    }
082
083    LOG.log(Level.FINER, "Closing the evaluators - end");
084  }
085
086  /**
087   * Return true if <em>all</em> evaluators are in closed state
088   * (and their processing queues are empty).
089   */
090  public synchronized boolean allEvaluatorsAreClosed() {
091    synchronized (this.evaluators) {
092      for (final EvaluatorManager eval : this.evaluators.values()) {
093        if (!eval.isClosed()) {
094          return false;
095        }
096      }
097    }
098    return true;
099  }
100
101  /**
102   * @param evaluatorId
103   * @return the EvaluatorManager for the given id, if one exists.
104   */
105  public synchronized Optional<EvaluatorManager> get(final String evaluatorId) {
106    return Optional.ofNullable(this.evaluators.get(evaluatorId));
107  }
108
109  /**
110   * @param evaluatorId
111   * @return true if evaluator with this id has already been closed.
112   */
113  public synchronized boolean wasClosed(final String evaluatorId) {
114    return this.closedEvaluatorIds.contains(evaluatorId);
115  }
116
117  /**
118   * Create new EvaluatorManager and add it to the collection.
119   * <p>
120   * FIXME: This method is a temporary fix for the race condition
121   * described in issues #828 and #839.
122   *
123   * @param evaluatorManagerFactory Factory that builds new EvaluatorManager objects.
124   * @param evaluatorMsg            Resource allocation message that contains data on the new evaluator.
125   * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
126   */
127  public synchronized void put(
128      final EvaluatorManagerFactory evaluatorManagerFactory,
129      final ResourceAllocationEvent evaluatorMsg) {
130    this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg));
131  }
132
133  /**
134   * Adds an EvaluatorManager.
135   *
136   * @param evaluatorManager
137   * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
138   */
139  public synchronized void put(final EvaluatorManager evaluatorManager) {
140    final String evaluatorId = evaluatorManager.getId();
141    if (this.wasClosed(evaluatorId)) {
142      throw new IllegalArgumentException(
143        "Trying to re-add an Evaluator that has already been closed: " + evaluatorId);
144    }
145    final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager);
146    LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev});
147    if (prev != null) {
148      throw new IllegalArgumentException(
149        "Trying to re-add an Evaluator that is already known: " + evaluatorId);
150    }
151  }
152
153  /**
154   * Moves evaluator from map of active evaluators to set of closed evaluators.
155   */
156  public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) {
157
158    final String evaluatorId = evaluatorManager.getId();
159    LOG.log(Level.FINE, "Removing closed evaluator: {0}", evaluatorId);
160
161    if (!evaluatorManager.isClosed()) {
162      throw new IllegalArgumentException("Removing evaluator that has not been closed yet: " + evaluatorId);
163    }
164
165    if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) {
166      throw new IllegalArgumentException("Removing unknown evaluator: " + evaluatorId);
167    }
168
169    if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) {
170      LOG.log(Level.FINE, "Removing closed evaluator which has already been removed: {0}", evaluatorId);
171      return;
172    }
173
174    evaluatorManager.shutdown();
175    this.evaluators.remove(evaluatorId);
176    this.closedEvaluatorIds.add(evaluatorId);
177
178    LOG.log(Level.FINEST, "Closed evaluator removed: {0}", evaluatorId);
179  }
180}