This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.local.driver;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.evaluator.EvaluatorProcess;
024import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
025import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
026import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
027import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
028import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
029import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
030import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
031import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
032import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
033import org.apache.reef.runtime.common.files.FileResource;
034import org.apache.reef.runtime.common.files.REEFFileNames;
035import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
036import org.apache.reef.runtime.common.utils.RemoteManager;
037import org.apache.reef.tang.annotations.Parameter;
038import org.apache.reef.tang.exceptions.BindException;
039import org.apache.reef.tang.formats.ConfigurationSerializer;
040import org.apache.reef.util.Optional;
041import org.apache.reef.util.logging.LoggingScope;
042import org.apache.reef.util.logging.LoggingScopeFactory;
043import org.apache.reef.wake.EventHandler;
044
045import java.io.File;
046import java.io.IOException;
047import java.util.ArrayList;
048import java.util.List;
049import java.util.logging.Level;
050import java.util.logging.Logger;
051
052import javax.inject.Inject;
053
054/**
055 * A resource manager that uses threads to execute containers.
056 */
057@Private
058@DriverSide
059public final class ResourceManager {
060
061  private static final Logger LOG = Logger.getLogger(ResourceManager.class.getName());
062
063  private final ResourceRequestQueue requestQueue = new ResourceRequestQueue();
064
065  private final EventHandler<ResourceAllocationEvent> allocationHandler;
066  private final ContainerManager theContainers;
067  private final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler;
068  private final ConfigurationSerializer configurationSerializer;
069  private final RemoteManager remoteManager;
070  private final REEFFileNames fileNames;
071  private final double jvmHeapFactor;
072  private final LoggingScopeFactory loggingScopeFactory;
073
074  @Inject
075  ResourceManager(
076      final ContainerManager containerManager,
077      @Parameter(RuntimeParameters.ResourceAllocationHandler.class)
078      final EventHandler<ResourceAllocationEvent> allocationHandler,
079      @Parameter(RuntimeParameters.RuntimeStatusHandler.class)
080      final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler,
081      @Parameter(JVMHeapSlack.class) final double jvmHeapSlack,
082      final ConfigurationSerializer configurationSerializer,
083      final RemoteManager remoteManager,
084      final REEFFileNames fileNames,
085      final LoggingScopeFactory loggingScopeFactory) {
086
087    this.theContainers = containerManager;
088    this.allocationHandler = allocationHandler;
089    this.runtimeStatusHandlerEventHandler = runtimeStatusHandlerEventHandler;
090    this.configurationSerializer = configurationSerializer;
091    this.remoteManager = remoteManager;
092    this.fileNames = fileNames;
093    this.jvmHeapFactor = 1.0 - jvmHeapSlack;
094    this.loggingScopeFactory = loggingScopeFactory;
095
096    LOG.log(Level.FINE, "Instantiated 'ResourceManager'");
097  }
098
099  /**
100   * Extracts the files out of the launchRequest.
101   *
102   * @param launchRequest the ResourceLaunchProto to parse
103   * @return a list of files set in the given ResourceLaunchProto
104   */
105  private static List<File> getLocalFiles(final ResourceLaunchEvent launchRequest) {
106    final List<File> files = new ArrayList<>();  // Libraries local to this evaluator
107    for (final FileResource frp : launchRequest.getFileSet()) {
108      files.add(new File(frp.getPath()).getAbsoluteFile());
109    }
110    return files;
111  }
112
113  /**
114   * Receives a resource request.
115   * <p>
116   * If the request can be met, it will also be satisfied immediately.
117   *
118   * @param resourceRequest the resource request to be handled.
119   */
120  void onResourceRequest(final ResourceRequestEvent resourceRequest) {
121    synchronized (this.theContainers) {
122      this.requestQueue.add(new ResourceRequest(resourceRequest));
123      this.checkRequestQueue();
124    }
125  }
126
127  /**
128   * Receives and processes a resource release request.
129   *
130   * @param releaseRequest the release request to be processed
131   */
132  void onResourceReleaseRequest(final ResourceReleaseEvent releaseRequest) {
133    synchronized (this.theContainers) {
134      LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier());
135      this.theContainers.release(releaseRequest.getIdentifier());
136      this.checkRequestQueue();
137    }
138  }
139
140  /**
141   * Called when the ReefRunnableProcessObserver detects that the Evaluator process has exited.
142   *
143   * @param evaluatorId the ID of the Evaluator that exited.
144   */
145  public void onEvaluatorExit(final String evaluatorId) {
146    synchronized (this.theContainers) {
147      this.theContainers.release(evaluatorId);
148      this.checkRequestQueue();
149    }
150  }
151
152  /**
153   * Processes a resource launch request.
154   *
155   * @param launchRequest the launch request to be processed.
156   */
157  void onResourceLaunchRequest(
158      final ResourceLaunchEvent launchRequest) {
159
160    synchronized (this.theContainers) {
161
162      final Container c = this.theContainers.get(launchRequest.getIdentifier());
163
164      try (final LoggingScope lb = this.loggingScopeFactory
165          .getNewLoggingScope("ResourceManager.onResourceLaunchRequest:evaluatorConfigurationFile")) {
166        // Add the global files and libraries.
167        c.addGlobalFiles(this.fileNames.getGlobalFolder());
168        c.addLocalFiles(getLocalFiles(launchRequest));
169
170        // Make the configuration file of the evaluator.
171        final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath());
172
173        try {
174          this.configurationSerializer.toFile(launchRequest.getEvaluatorConf(), evaluatorConfigurationFile);
175        } catch (final IOException | BindException e) {
176          throw new RuntimeException("Unable to write configuration.", e);
177        }
178      }
179
180      try (final LoggingScope lc = this.loggingScopeFactory
181          .getNewLoggingScope("ResourceManager.onResourceLaunchRequest:runCommand")) {
182
183        final List<String> command = getLaunchCommand(launchRequest, c.getMemory());
184        LOG.log(Level.FINEST, "Launching container: {0}", c);
185        c.run(command);
186      }
187    }
188  }
189
190  private List<String> getLaunchCommand(final ResourceLaunchEvent launchRequest,
191                                        final int containerMemory) {
192    final EvaluatorProcess process = launchRequest.getProcess()
193        .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath());
194
195    if (process.isOptionSet()) {
196      return process.getCommandLine();
197    } else {
198      return process
199          .setMemory((int) (this.jvmHeapFactor * containerMemory))
200          .getCommandLine();
201    }
202  }
203
204  /**
205  /**
206   * Checks the allocation queue for new allocations and if there are any
207   * satisfies them.
208   */
209  private void checkRequestQueue() {
210
211    if (requestQueue.hasOutStandingRequests()) {
212      final ResourceRequest resourceRequest = requestQueue.head();
213      final ResourceRequestEvent requestEvent = resourceRequest.getRequestProto();
214      final Optional<Container> cont = theContainers.allocateContainer(requestEvent);
215      if (cont.isPresent()) {
216        // Container has been allocated
217        requestQueue.satisfyOne();
218        final Container container = cont.get();
219        // Tell the receivers about it
220        final ResourceAllocationEvent alloc = ResourceEventImpl.newAllocationBuilder()
221            .setIdentifier(container.getContainerID()).setNodeId(container.getNodeID())
222            .setResourceMemory(container.getMemory()).setVirtualCores(container.getNumberOfCores())
223            .setRackName(container.getRackName()).setRuntimeName(RuntimeIdentifier.RUNTIME_NAME).build();
224
225        LOG.log(Level.FINEST, "Allocating container: {0}", container);
226        this.allocationHandler.onNext(alloc);
227        // update REEF
228        this.sendRuntimeStatus();
229
230        // Check whether we can satisfy another one.
231        this.checkRequestQueue();
232      } else {
233        // could not allocate, update REEF
234        this.sendRuntimeStatus();
235      }
236    } else {
237      // done
238      this.sendRuntimeStatus();
239    }
240  }
241
242  private void sendRuntimeStatus() {
243
244    final RuntimeStatusEventImpl.Builder builder =
245        RuntimeStatusEventImpl.newBuilder()
246            .setName("LOCAL")
247            .setState(State.RUNNING)
248            .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests());
249    for (final String containerAllocation : this.theContainers.getAllocatedContainerIDs()) {
250      builder.addContainerAllocation(containerAllocation);
251    }
252    final RuntimeStatusEvent msg = builder.build();
253
254    LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}",
255        new Object[]{msg.getContainerAllocationList().size(), msg.getOutstandingContainerRequests()});
256    this.runtimeStatusHandlerEventHandler.onNext(msg);
257  }
258}