001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.commons.lang3.Validate;
022import org.apache.reef.annotations.audience.DriverSide;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.driver.catalog.NodeDescriptor;
025import org.apache.reef.driver.catalog.ResourceCatalog;
026import org.apache.reef.driver.evaluator.EvaluatorProcessFactory;
027import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl;
028import org.apache.reef.runtime.common.driver.resourcemanager.*;
029import org.apache.reef.tang.Injector;
030import org.apache.reef.tang.exceptions.BindException;
031import org.apache.reef.tang.exceptions.InjectionException;
032
033import javax.inject.Inject;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037/**
038 * Helper class that creates new EvaluatorManager instances from allocations.
039 */
040@Private
041@DriverSide
042public final class EvaluatorManagerFactory {
043  private static final Logger LOG = Logger.getLogger(EvaluatorManagerFactory.class.getName());
044
045  private final Injector injector;
046  private final ResourceCatalog resourceCatalog;
047  private final EvaluatorProcessFactory processFactory;
048
049  @Inject
050  EvaluatorManagerFactory(final Injector injector,
051                          final ResourceCatalog resourceCatalog,
052                          final EvaluatorProcessFactory processFactory) {
053    this.injector = injector;
054    this.resourceCatalog = resourceCatalog;
055    this.processFactory = processFactory;
056  }
057
058  private EvaluatorManager getNewEvaluatorManagerInstanceForResource(
059      final ResourceEvent resourceEvent) {
060    NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceEvent.getNodeId());
061
062    if (nodeDescriptor == null) {
063      final String nodeId = resourceEvent.getNodeId();
064      LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it", nodeId);
065      final String[] hostNameAndPort = nodeId.split(":");
066      Validate.isTrue(hostNameAndPort.length == 2);
067      final NodeDescriptorEvent nodeDescriptorEvent = NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId)
068          .setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1]))
069          .setMemorySize(resourceEvent.getResourceMemory())
070          .setRackName(resourceEvent.getRackName().get()).build();
071      // downcasting not to change the API
072      ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent);
073      nodeDescriptor = this.resourceCatalog.getNode(nodeId);
074    }
075    final EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor,
076        resourceEvent.getResourceMemory(), resourceEvent.getVirtualCores().get(),
077        processFactory.newEvaluatorProcess(), resourceEvent.getRuntimeName());
078
079    LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", resourceEvent.getIdentifier());
080    final EvaluatorManager evaluatorManager =
081        getNewEvaluatorManagerInstance(resourceEvent.getIdentifier(), evaluatorDescriptor);
082
083    return evaluatorManager;
084  }
085
086  /**
087   * Helper method to create a new EvaluatorManager instance.
088   *
089   * @param id   identifier of the Evaluator
090   * @param desc NodeDescriptor on which the Evaluator executes.
091   * @return a new EvaluatorManager instance.
092   */
093  private EvaluatorManager getNewEvaluatorManagerInstance(final String id, final EvaluatorDescriptorImpl desc) {
094    LOG.log(Level.FINEST, "Creating Evaluator Manager for Evaluator ID {0}", id);
095    final Injector child = this.injector.forkInjector();
096
097    try {
098      child.bindVolatileParameter(EvaluatorManager.EvaluatorIdentifier.class, id);
099      child.bindVolatileParameter(EvaluatorManager.EvaluatorDescriptorName.class, desc);
100    } catch (final BindException e) {
101      throw new RuntimeException("Unable to bind evaluator identifier and name.", e);
102    }
103
104    final EvaluatorManager result;
105    try {
106      result = child.getInstance(EvaluatorManager.class);
107    } catch (final InjectionException e) {
108      throw new RuntimeException("Unable to instantiate a new EvaluatorManager for Evaluator ID: " + id, e);
109    }
110    return result;
111  }
112
113  /**
114   * Instantiates a new EvaluatorManager based on a resource allocation.
115   * Fires the EvaluatorAllocatedEvent.
116   *
117   * @param resourceAllocationEvent
118   * @return an EvaluatorManager for the newly allocated Evaluator.
119   */
120  public EvaluatorManager getNewEvaluatorManagerForNewEvaluator(
121      final ResourceAllocationEvent resourceAllocationEvent) {
122    final EvaluatorManager evaluatorManager = getNewEvaluatorManagerInstanceForResource(resourceAllocationEvent);
123    evaluatorManager.fireEvaluatorAllocatedEvent();
124
125    return evaluatorManager;
126  }
127
128  /**
129   * Instantiates a new EvaluatorManager for a failed evaluator during driver restart.
130   * Does not fire an EvaluatorAllocatedEvent.
131   * @param resourceStatusEvent
132   * @return an EvaluatorManager for the user to call fail on.
133   */
134  public EvaluatorManager getNewEvaluatorManagerForEvaluatorFailedDuringDriverRestart(
135      final ResourceStatusEvent resourceStatusEvent) {
136    return getNewEvaluatorManagerInstance(resourceStatusEvent.getIdentifier(),
137        new EvaluatorDescriptorImpl(null, 128, 1, processFactory.newEvaluatorProcess(),
138                resourceStatusEvent.getRuntimeName()));
139  }
140
141  /**
142   * Instantiates a new EvaluatorManager based on a resource allocation from a recovered evaluator.
143   *
144   * @param resourceRecoverEvent
145   * @return an EvaluatorManager for the newly allocated Evaluator.
146   */
147  public EvaluatorManager getNewEvaluatorManagerForRecoveredEvaluator(
148      final ResourceRecoverEvent resourceRecoverEvent) {
149    return getNewEvaluatorManagerInstanceForResource(resourceRecoverEvent);
150  }
151}