001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.commons.lang3.Validate; 022import org.apache.reef.annotations.audience.DriverSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.driver.catalog.NodeDescriptor; 025import org.apache.reef.driver.catalog.ResourceCatalog; 026import org.apache.reef.driver.evaluator.EvaluatorProcessFactory; 027import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl; 028import org.apache.reef.runtime.common.driver.resourcemanager.*; 029import org.apache.reef.tang.Injector; 030import org.apache.reef.tang.exceptions.BindException; 031import org.apache.reef.tang.exceptions.InjectionException; 032 033import javax.inject.Inject; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037/** 038 * Helper class that creates new EvaluatorManager instances from allocations. 039 */ 040@Private 041@DriverSide 042public final class EvaluatorManagerFactory { 043 private static final Logger LOG = Logger.getLogger(EvaluatorManagerFactory.class.getName()); 044 045 private final Injector injector; 046 private final ResourceCatalog resourceCatalog; 047 private final EvaluatorProcessFactory processFactory; 048 049 @Inject 050 EvaluatorManagerFactory(final Injector injector, 051 final ResourceCatalog resourceCatalog, 052 final EvaluatorProcessFactory processFactory) { 053 this.injector = injector; 054 this.resourceCatalog = resourceCatalog; 055 this.processFactory = processFactory; 056 } 057 058 private EvaluatorManager getNewEvaluatorManagerInstanceForResource( 059 final ResourceEvent resourceEvent) { 060 NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceEvent.getNodeId()); 061 062 if (nodeDescriptor == null) { 063 final String nodeId = resourceEvent.getNodeId(); 064 LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it", nodeId); 065 final String[] hostNameAndPort = nodeId.split(":"); 066 Validate.isTrue(hostNameAndPort.length == 2); 067 final NodeDescriptorEvent nodeDescriptorEvent = NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId) 068 .setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1])) 069 .setMemorySize(resourceEvent.getResourceMemory()) 070 .setRackName(resourceEvent.getRackName().get()).build(); 071 // downcasting not to change the API 072 ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent); 073 nodeDescriptor = this.resourceCatalog.getNode(nodeId); 074 } 075 final EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, 076 resourceEvent.getResourceMemory(), resourceEvent.getVirtualCores().get(), 077 processFactory.newEvaluatorProcess(), resourceEvent.getRuntimeName()); 078 079 LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", resourceEvent.getIdentifier()); 080 final EvaluatorManager evaluatorManager = 081 getNewEvaluatorManagerInstance(resourceEvent.getIdentifier(), evaluatorDescriptor); 082 083 return evaluatorManager; 084 } 085 086 /** 087 * Helper method to create a new EvaluatorManager instance. 088 * 089 * @param id identifier of the Evaluator 090 * @param desc NodeDescriptor on which the Evaluator executes. 091 * @return a new EvaluatorManager instance. 092 */ 093 private EvaluatorManager getNewEvaluatorManagerInstance(final String id, final EvaluatorDescriptorImpl desc) { 094 LOG.log(Level.FINEST, "Creating Evaluator Manager for Evaluator ID {0}", id); 095 final Injector child = this.injector.forkInjector(); 096 097 try { 098 child.bindVolatileParameter(EvaluatorManager.EvaluatorIdentifier.class, id); 099 child.bindVolatileParameter(EvaluatorManager.EvaluatorDescriptorName.class, desc); 100 } catch (final BindException e) { 101 throw new RuntimeException("Unable to bind evaluator identifier and name.", e); 102 } 103 104 final EvaluatorManager result; 105 try { 106 result = child.getInstance(EvaluatorManager.class); 107 } catch (final InjectionException e) { 108 throw new RuntimeException("Unable to instantiate a new EvaluatorManager for Evaluator ID: " + id, e); 109 } 110 return result; 111 } 112 113 /** 114 * Instantiates a new EvaluatorManager based on a resource allocation. 115 * Fires the EvaluatorAllocatedEvent. 116 * 117 * @param resourceAllocationEvent 118 * @return an EvaluatorManager for the newly allocated Evaluator. 119 */ 120 public EvaluatorManager getNewEvaluatorManagerForNewEvaluator( 121 final ResourceAllocationEvent resourceAllocationEvent) { 122 final EvaluatorManager evaluatorManager = getNewEvaluatorManagerInstanceForResource(resourceAllocationEvent); 123 evaluatorManager.fireEvaluatorAllocatedEvent(); 124 125 return evaluatorManager; 126 } 127 128 /** 129 * Instantiates a new EvaluatorManager for a failed evaluator during driver restart. 130 * Does not fire an EvaluatorAllocatedEvent. 131 * @param resourceStatusEvent 132 * @return an EvaluatorManager for the user to call fail on. 133 */ 134 public EvaluatorManager getNewEvaluatorManagerForEvaluatorFailedDuringDriverRestart( 135 final ResourceStatusEvent resourceStatusEvent) { 136 return getNewEvaluatorManagerInstance(resourceStatusEvent.getIdentifier(), 137 new EvaluatorDescriptorImpl(null, 128, 1, processFactory.newEvaluatorProcess(), 138 resourceStatusEvent.getRuntimeName())); 139 } 140 141 /** 142 * Instantiates a new EvaluatorManager based on a resource allocation from a recovered evaluator. 143 * 144 * @param resourceRecoverEvent 145 * @return an EvaluatorManager for the newly allocated Evaluator. 146 */ 147 public EvaluatorManager getNewEvaluatorManagerForRecoveredEvaluator( 148 final ResourceRecoverEvent resourceRecoverEvent) { 149 return getNewEvaluatorManagerInstanceForResource(resourceRecoverEvent); 150 } 151}