This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.local.driver;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.proto.DriverRuntimeProtocol;
024import org.apache.reef.proto.ReefServiceProtos;
025import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
026import org.apache.reef.runtime.common.files.ClasspathProvider;
027import org.apache.reef.runtime.common.files.REEFFileNames;
028import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
029import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
030import org.apache.reef.runtime.common.launch.LaunchCommandBuilder;
031import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
032import org.apache.reef.runtime.common.utils.RemoteManager;
033import org.apache.reef.runtime.local.client.parameters.DefaultMemorySize;
034import org.apache.reef.runtime.local.client.parameters.DefaultNumberOfCores;
035import org.apache.reef.runtime.local.driver.parameters.GlobalFiles;
036import org.apache.reef.runtime.local.driver.parameters.GlobalLibraries;
037import org.apache.reef.tang.annotations.Parameter;
038import org.apache.reef.tang.exceptions.BindException;
039import org.apache.reef.tang.formats.ConfigurationSerializer;
040import org.apache.reef.util.logging.LoggingScope;
041import org.apache.reef.util.logging.LoggingScopeFactory;
042import org.apache.reef.wake.EventHandler;
043
044import javax.inject.Inject;
045import java.io.File;
046import java.io.IOException;
047import java.util.ArrayList;
048import java.util.List;
049import java.util.Set;
050import java.util.logging.Level;
051import java.util.logging.Logger;
052
053/**
054 * A resource manager that uses threads to execute containers.
055 */
056@Private
057@DriverSide
058public final class ResourceManager {
059
060  private final static Logger LOG = Logger.getLogger(ResourceManager.class.getName());
061
062  private final ResourceRequestQueue requestQueue = new ResourceRequestQueue();
063
064  private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler;
065  private final ContainerManager theContainers;
066  private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler;
067  private final int defaultMemorySize;
068  private final int defaultNumberOfCores;
069  private final ConfigurationSerializer configurationSerializer;
070  private final RemoteManager remoteManager;
071  private final REEFFileNames fileNames;
072  private final ClasspathProvider classpathProvider;
073  private final double jvmHeapFactor;
074  private final LoggingScopeFactory loggingScopeFactory;
075
076  @Inject
077  ResourceManager(
078      final ContainerManager containerManager,
079      final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler,
080      final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler,
081      final @Parameter(GlobalLibraries.class) Set<String> globalLibraries,
082      final @Parameter(GlobalFiles.class) Set<String> globalFiles,
083      final @Parameter(DefaultMemorySize.class) int defaultMemorySize,
084      final @Parameter(DefaultNumberOfCores.class) int defaultNumberOfCores,
085      final @Parameter(JVMHeapSlack.class) double jvmHeapSlack,
086      final ConfigurationSerializer configurationSerializer,
087      final RemoteManager remoteManager,
088      final REEFFileNames fileNames,
089      final ClasspathProvider classpathProvider,
090      final LoggingScopeFactory loggingScopeFactory) {
091
092    this.theContainers = containerManager;
093    this.allocationHandler = allocationHandler;
094    this.runtimeStatusHandlerEventHandler = runtimeStatusHandlerEventHandler;
095    this.configurationSerializer = configurationSerializer;
096    this.remoteManager = remoteManager;
097    this.defaultMemorySize = defaultMemorySize;
098    this.defaultNumberOfCores = defaultNumberOfCores;
099    this.fileNames = fileNames;
100    this.classpathProvider = classpathProvider;
101    this.jvmHeapFactor = 1.0 - jvmHeapSlack;
102    this.loggingScopeFactory = loggingScopeFactory;
103
104    LOG.log(Level.FINE, "Instantiated 'ResourceManager'");
105  }
106
107  /**
108   * Extracts the files out of the launchRequest.
109   *
110   * @param launchRequest the ResourceLaunchProto to parse
111   * @return a list of files set in the given ResourceLaunchProto
112   */
113  private static List<File> getLocalFiles(final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
114    final List<File> files = new ArrayList<>();  // Libraries local to this evaluator
115    for (final ReefServiceProtos.FileResourceProto frp : launchRequest.getFileList()) {
116      files.add(new File(frp.getPath()).getAbsoluteFile());
117    }
118    return files;
119  }
120
121  /**
122   * Receives a resource request.
123   * <p/>
124   * If the request can be met, it will also be satisfied immediately.
125   *
126   * @param resourceRequest the resource request to be handled.
127   */
128  final void onResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto resourceRequest) {
129    synchronized (this.theContainers) {
130      this.requestQueue.add(new ResourceRequest(resourceRequest));
131      this.checkRequestQueue();
132    }
133  }
134
135  /**
136   * Receives and processes a resource release request.
137   *
138   * @param releaseRequest the release request to be processed
139   */
140  final void onResourceReleaseRequest(final DriverRuntimeProtocol.ResourceReleaseProto releaseRequest) {
141    synchronized (this.theContainers) {
142      LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier());
143      this.theContainers.release(releaseRequest.getIdentifier());
144      this.checkRequestQueue();
145    }
146  }
147
148  /**
149   * Called when the ReefRunnableProcessObserver detects that the Evaluator process has exited.
150   *
151   * @param evaluatorId the ID of the Evaluator that exited.
152   */
153  public final void onEvaluatorExit(final String evaluatorId) {
154    synchronized (this.theContainers) {
155      this.theContainers.release(evaluatorId);
156      this.checkRequestQueue();
157    }
158  }
159
160  /**
161   * Processes a resource launch request.
162   *
163   * @param launchRequest the launch request to be processed.
164   */
165  final void onResourceLaunchRequest(
166      final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
167
168    synchronized (this.theContainers) {
169
170      final Container c = this.theContainers.get(launchRequest.getIdentifier());
171
172      try (final LoggingScope lb = this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:evaluatorConfigurationFile")) {
173        // Add the global files and libraries.
174        c.addGlobalFiles(this.fileNames.getGlobalFolder());
175        c.addLocalFiles(getLocalFiles(launchRequest));
176
177        // Make the configuration file of the evaluator.
178        final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath());
179
180        try {
181          this.configurationSerializer.toFile(this.configurationSerializer.fromString(launchRequest.getEvaluatorConf()),
182              evaluatorConfigurationFile);
183        } catch (final IOException | BindException e) {
184          throw new RuntimeException("Unable to write configuration.", e);
185        }
186      }
187
188      try (final LoggingScope lc = this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:runCommand")) {
189        // Assemble the command line
190        final LaunchCommandBuilder commandBuilder;
191        switch (launchRequest.getType()) {
192          case JVM:
193            commandBuilder = new JavaLaunchCommandBuilder()
194                .setClassPath(this.classpathProvider.getEvaluatorClasspath());
195            break;
196          case CLR:
197            commandBuilder = new CLRLaunchCommandBuilder();
198            break;
199          default:
200            throw new IllegalArgumentException(
201                "Unsupported container type: " + launchRequest.getType());
202        }
203
204        final List<String> command = commandBuilder
205            .setErrorHandlerRID(this.remoteManager.getMyIdentifier())
206            .setLaunchID(c.getNodeID())
207            .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath())
208            .setMemory((int) (this.jvmHeapFactor * c.getMemory()))
209            .build();
210
211        LOG.log(Level.FINEST, "Launching container: {0}", c);
212        c.run(command);
213      }
214    }
215  }
216
217  /**
218   * Checks the allocation queue for new allocations and if there are any
219   * satisfies them.
220   */
221  private void checkRequestQueue() {
222
223    if (this.theContainers.hasContainerAvailable() && this.requestQueue.hasOutStandingRequests()) {
224
225      // Record the satisfaction of one request and get its details.
226      final DriverRuntimeProtocol.ResourceRequestProto requestProto = this.requestQueue.satisfyOne();
227
228      // Allocate a Container
229      final Container container = this.theContainers.allocateOne(
230          requestProto.hasMemorySize() ? requestProto.getMemorySize() : this.defaultMemorySize,
231          requestProto.hasVirtualCores() ? requestProto.getVirtualCores() : this.defaultNumberOfCores);
232
233      // Tell the receivers about it
234      final DriverRuntimeProtocol.ResourceAllocationProto alloc =
235          DriverRuntimeProtocol.ResourceAllocationProto.newBuilder()
236              .setIdentifier(container.getContainerID())
237              .setNodeId(container.getNodeID())
238              .setResourceMemory(container.getMemory())
239              .setVirtualCores(container.getNumberOfCores())
240              .build();
241
242      LOG.log(Level.FINEST, "Allocating container: {0}", container);
243      this.allocationHandler.onNext(alloc);
244
245      // update REEF
246      this.sendRuntimeStatus();
247
248      // Check whether we can satisfy another one.
249      this.checkRequestQueue();
250
251    } else {
252      this.sendRuntimeStatus();
253    }
254  }
255
256  private void sendRuntimeStatus() {
257
258    final DriverRuntimeProtocol.RuntimeStatusProto msg =
259        DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
260            .setName("LOCAL")
261            .setState(ReefServiceProtos.State.RUNNING)
262            .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests())
263            .addAllContainerAllocation(this.theContainers.getAllocatedContainerIDs())
264            .build();
265
266    LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}",
267        new Object[]{msg.getContainerAllocationCount(), msg.getOutstandingContainerRequests()});
268    this.runtimeStatusHandlerEventHandler.onNext(msg);
269  }
270}