001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.parameters.DriverIdentifier; 024import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; 025import org.apache.reef.tang.annotations.Parameter; 026import org.apache.reef.util.Optional; 027import org.apache.reef.tang.util.MonotonicSet; 028import org.apache.reef.util.SingletonAsserter; 029 030import javax.inject.Inject; 031import java.util.*; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Manages all Evaluators. 037 * See EvaluatorManager for the Driver side representation of a single Evaluator. 038 */ 039@DriverSide 040@Private 041public final class Evaluators implements AutoCloseable { 042 043 private static final Logger LOG = Logger.getLogger(Evaluators.class.getName()); 044 045 /** 046 * A map between evaluatorId and the EvaluatorManager that handles this evaluator. 047 */ 048 private final Map<String, EvaluatorManager> evaluators = new HashMap<>(); 049 050 /** 051 * A set of evaluatorIds for "closed" (failed and returned) evaluators. 052 */ 053 private final MonotonicSet<String> closedEvaluatorIds = new MonotonicSet<>(); 054 055 @Inject 056 private Evaluators(@Parameter(DriverIdentifier.class) final String driverId) { 057 LOG.log(Level.FINE, "Instantiated 'Evaluators' for driver {0}", driverId); 058 // There can be several instances of the class for multiple REEFEnvironments. 059 // It is still a singleton when REEF Driver owns the entire JVM. 060 assert SingletonAsserter.assertSingleton(driverId, Evaluators.class); 061 } 062 063 /** 064 * Closes all EvaluatorManager instances managed. 065 */ 066 @Override 067 public void close() { 068 069 LOG.log(Level.FINER, "Closing the evaluators - begin"); 070 071 final List<EvaluatorManager> evaluatorsCopy; 072 synchronized (this) { 073 evaluatorsCopy = new ArrayList<>(this.evaluators.values()); 074 } 075 076 for (final EvaluatorManager evaluatorManager : evaluatorsCopy) { 077 if (!evaluatorManager.isClosedOrClosing()) { 078 LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId()); 079 evaluatorManager.close(); 080 } 081 } 082 083 LOG.log(Level.FINER, "Closing the evaluators - end"); 084 } 085 086 /** 087 * Return true if <em>all</em> evaluators are in closed state 088 * (and their processing queues are empty). 089 */ 090 public synchronized boolean allEvaluatorsAreClosed() { 091 synchronized (this.evaluators) { 092 for (final EvaluatorManager eval : this.evaluators.values()) { 093 if (!eval.isClosed()) { 094 return false; 095 } 096 } 097 } 098 return true; 099 } 100 101 /** 102 * @param evaluatorId 103 * @return the EvaluatorManager for the given id, if one exists. 104 */ 105 public synchronized Optional<EvaluatorManager> get(final String evaluatorId) { 106 return Optional.ofNullable(this.evaluators.get(evaluatorId)); 107 } 108 109 /** 110 * @param evaluatorId 111 * @return true if evaluator with this id has already been closed. 112 */ 113 public synchronized boolean wasClosed(final String evaluatorId) { 114 return this.closedEvaluatorIds.contains(evaluatorId); 115 } 116 117 /** 118 * Create new EvaluatorManager and add it to the collection. 119 * <p> 120 * FIXME: This method is a temporary fix for the race condition 121 * described in issues #828 and #839. 122 * 123 * @param evaluatorManagerFactory Factory that builds new EvaluatorManager objects. 124 * @param evaluatorMsg Resource allocation message that contains data on the new evaluator. 125 * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known. 126 */ 127 public synchronized void put( 128 final EvaluatorManagerFactory evaluatorManagerFactory, 129 final ResourceAllocationEvent evaluatorMsg) { 130 this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg)); 131 } 132 133 /** 134 * Adds an EvaluatorManager. 135 * 136 * @param evaluatorManager 137 * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known. 138 */ 139 public synchronized void put(final EvaluatorManager evaluatorManager) { 140 final String evaluatorId = evaluatorManager.getId(); 141 if (this.wasClosed(evaluatorId)) { 142 throw new IllegalArgumentException( 143 "Trying to re-add an Evaluator that has already been closed: " + evaluatorId); 144 } 145 final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager); 146 LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev}); 147 if (prev != null) { 148 throw new IllegalArgumentException( 149 "Trying to re-add an Evaluator that is already known: " + evaluatorId); 150 } 151 } 152 153 /** 154 * Moves evaluator from map of active evaluators to set of closed evaluators. 155 */ 156 public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) { 157 158 final String evaluatorId = evaluatorManager.getId(); 159 LOG.log(Level.FINE, "Removing closed evaluator: {0}", evaluatorId); 160 161 if (!evaluatorManager.isClosed()) { 162 throw new IllegalArgumentException("Removing evaluator that has not been closed yet: " + evaluatorId); 163 } 164 165 if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) { 166 throw new IllegalArgumentException("Removing unknown evaluator: " + evaluatorId); 167 } 168 169 if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) { 170 LOG.log(Level.FINE, "Removing closed evaluator which has already been removed: {0}", evaluatorId); 171 return; 172 } 173 174 evaluatorManager.shutdown(); 175 this.evaluators.remove(evaluatorId); 176 this.closedEvaluatorIds.add(evaluatorId); 177 178 LOG.log(Level.FINEST, "Closed evaluator removed: {0}", evaluatorId); 179 } 180}