001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; 024import org.apache.reef.util.Optional; 025import org.apache.reef.util.SingletonAsserter; 026import org.apache.reef.tang.util.MonotonicSet; 027 028import javax.inject.Inject; 029import java.util.*; 030import java.util.logging.Level; 031import java.util.logging.Logger; 032 033/** 034 * Manages all Evaluators. 035 * See EvaluatorManager for the Driver side representation of a single Evaluator. 036 */ 037@DriverSide 038@Private 039public final class Evaluators implements AutoCloseable { 040 041 private static final Logger LOG = Logger.getLogger(Evaluators.class.getName()); 042 043 /** 044 * A map between evaluatorId and the EvaluatorManager that handles this evaluator. 045 */ 046 private final Map<String, EvaluatorManager> evaluators = new HashMap<>(); 047 048 /** 049 * A set of evaluatorIds for "closed" (failed and returned) evaluators. 050 */ 051 private final MonotonicSet<String> closedEvaluatorIds = new MonotonicSet<>(); 052 053 @Inject 054 Evaluators() { 055 LOG.log(Level.FINE, "Instantiated 'Evaluators'"); 056 assert SingletonAsserter.assertSingleton(Evaluators.class); 057 } 058 059 /** 060 * Closes all EvaluatorManager instances managed. 061 */ 062 @Override 063 public void close() { 064 final List<EvaluatorManager> evaluatorsCopy; 065 synchronized (this) { 066 evaluatorsCopy = new ArrayList<>(this.evaluators.values()); 067 } 068 for (final EvaluatorManager evaluatorManager : evaluatorsCopy) { 069 LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId()); 070 if (!evaluatorManager.isClosed()) { 071 evaluatorManager.close(); 072 } 073 } 074 } 075 076 /** 077 * Return true if <em>all</em> evaluators are in closed state 078 * (and their processing queues are empty). 079 */ 080 public synchronized boolean allEvaluatorsAreClosed() { 081 synchronized (this.evaluators) { 082 for (final EvaluatorManager eval : this.evaluators.values()) { 083 if (!eval.isClosed()) { 084 return false; 085 } 086 } 087 } 088 return true; 089 } 090 091 /** 092 * @param evaluatorId 093 * @return the EvaluatorManager for the given id, if one exists. 094 */ 095 public synchronized Optional<EvaluatorManager> get(final String evaluatorId) { 096 return Optional.ofNullable(this.evaluators.get(evaluatorId)); 097 } 098 099 /** 100 * @param evaluatorId 101 * @return true if evaluator with this id has already been closed. 102 */ 103 public synchronized boolean wasClosed(final String evaluatorId) { 104 return this.closedEvaluatorIds.contains(evaluatorId); 105 } 106 107 /** 108 * Create new EvaluatorManager and add it to the collection. 109 * <p> 110 * FIXME: This method is a temporary fix for the race condition 111 * described in issues #828 and #839. 112 * 113 * @param evaluatorManagerFactory Factory that builds new EvaluatorManager objects. 114 * @param evaluatorMsg Resource allocation message that contains data on the new evaluator. 115 * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known. 116 */ 117 public synchronized void put( 118 final EvaluatorManagerFactory evaluatorManagerFactory, 119 final ResourceAllocationEvent evaluatorMsg) { 120 this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg)); 121 } 122 123 /** 124 * Adds an EvaluatorManager. 125 * 126 * @param evaluatorManager 127 * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known. 128 */ 129 public synchronized void put(final EvaluatorManager evaluatorManager) { 130 final String evaluatorId = evaluatorManager.getId(); 131 if (this.wasClosed(evaluatorId)) { 132 throw new IllegalArgumentException( 133 "Trying to re-add an Evaluator that has already been closed: " + evaluatorId); 134 } 135 final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager); 136 LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev}); 137 if (prev != null) { 138 throw new IllegalArgumentException( 139 "Trying to re-add an Evaluator that is already known: " + evaluatorId); 140 } 141 } 142 143 /** 144 * Moves evaluator from map of active evaluators to set of closed evaluators. 145 */ 146 public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) { 147 final String evaluatorId = evaluatorManager.getId(); 148 if (!evaluatorManager.isClosed()) { 149 throw new IllegalArgumentException("Trying to remove evaluator " + evaluatorId + " which is not closed yet."); 150 } 151 if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) { 152 throw new IllegalArgumentException("Trying to remove unknown evaluator " + evaluatorId + "."); 153 } 154 if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) { 155 LOG.log(Level.FINE, "Trying to remove closed evaluator " + evaluatorId + " which has already been removed."); 156 } else { 157 LOG.log(Level.FINE, "Removing closed evaluator " + evaluatorId + "."); 158 this.evaluators.remove(evaluatorId); 159 this.closedEvaluatorIds.add(evaluatorId); 160 } 161 } 162}