001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; 024import org.apache.reef.util.Optional; 025import org.apache.reef.util.SingletonAsserter; 026 027import javax.inject.Inject; 028import java.util.ArrayList; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Manages all Evaluators. 037 * See EvaluatorManager for the Driver side representation of a single Evaluator. 038 */ 039@DriverSide 040@Private 041public final class Evaluators implements AutoCloseable { 042 043 private static final Logger LOG = Logger.getLogger(Evaluators.class.getName()); 044 045 /** 046 * A map between evaluatorId and the EvaluatorManager that handles this evaluator. 047 */ 048 private final Map<String, EvaluatorManager> evaluators = new HashMap<>(); 049 050 051 @Inject 052 Evaluators() { 053 LOG.log(Level.FINE, "Instantiated 'Evaluators'"); 054 assert SingletonAsserter.assertSingleton(Evaluators.class); 055 } 056 057 /** 058 * Closes all EvaluatorManager instances managed. 059 */ 060 @Override 061 public void close() { 062 final List<EvaluatorManager> evaluatorsCopy; 063 synchronized (this) { 064 evaluatorsCopy = new ArrayList<>(this.evaluators.values()); 065 } 066 for (final EvaluatorManager evaluatorManager : evaluatorsCopy) { 067 LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId()); 068 if (!evaluatorManager.isClosed()) { 069 evaluatorManager.close(); 070 } 071 } 072 } 073 074 /** 075 * Return true if <em>all</em> evaluators are in closed state 076 * (and their processing queues are empty). 077 */ 078 public synchronized boolean allEvaluatorsAreClosed() { 079 synchronized (this.evaluators) { 080 for (final EvaluatorManager eval : this.evaluators.values()) { 081 if (!eval.isClosed()) { 082 return false; 083 } 084 } 085 } 086 return true; 087 } 088 089 /** 090 * @param evaluatorId 091 * @return the EvaluatorManager for the given id, if one exists. 092 */ 093 public synchronized Optional<EvaluatorManager> get(final String evaluatorId) { 094 return Optional.ofNullable(this.evaluators.get(evaluatorId)); 095 } 096 097 /** 098 * Create new EvaluatorManager and add it to the collection. 099 * <p> 100 * FIXME: This method is a temporary fix for the race condition 101 * described in issues #828 and #839. 102 * 103 * @param evaluatorManagerFactory Factory that builds new EvaluatorManager objects. 104 * @param evaluatorMsg Resource allocation message that contains data on the new evaluator. 105 * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known. 106 */ 107 public synchronized void put( 108 final EvaluatorManagerFactory evaluatorManagerFactory, 109 final ResourceAllocationEvent evaluatorMsg) { 110 this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg)); 111 } 112 113 /** 114 * Adds an EvaluatorManager. 115 * 116 * @param evaluatorManager 117 * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known. 118 */ 119 public synchronized void put(final EvaluatorManager evaluatorManager) { 120 final String evaluatorId = evaluatorManager.getId(); 121 final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager); 122 LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev}); 123 if (prev != null) { 124 throw new IllegalArgumentException( 125 "Trying to re-add an Evaluator that is already known: " + evaluatorId); 126 } 127 } 128}