001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.local.driver; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.proto.DriverRuntimeProtocol; 024import org.apache.reef.proto.ReefServiceProtos; 025import org.apache.reef.runtime.common.driver.api.RuntimeParameters; 026import org.apache.reef.runtime.common.files.ClasspathProvider; 027import org.apache.reef.runtime.common.files.REEFFileNames; 028import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder; 029import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; 030import org.apache.reef.runtime.common.launch.LaunchCommandBuilder; 031import org.apache.reef.runtime.common.parameters.JVMHeapSlack; 032import org.apache.reef.runtime.common.utils.RemoteManager; 033import org.apache.reef.runtime.local.client.parameters.DefaultMemorySize; 034import org.apache.reef.runtime.local.client.parameters.DefaultNumberOfCores; 035import org.apache.reef.runtime.local.driver.parameters.GlobalFiles; 036import org.apache.reef.runtime.local.driver.parameters.GlobalLibraries; 037import org.apache.reef.tang.annotations.Parameter; 038import org.apache.reef.tang.exceptions.BindException; 039import org.apache.reef.tang.formats.ConfigurationSerializer; 040import org.apache.reef.util.logging.LoggingScope; 041import org.apache.reef.util.logging.LoggingScopeFactory; 042import org.apache.reef.wake.EventHandler; 043 044import javax.inject.Inject; 045import java.io.File; 046import java.io.IOException; 047import java.util.ArrayList; 048import java.util.List; 049import java.util.Set; 050import java.util.logging.Level; 051import java.util.logging.Logger; 052 053/** 054 * A resource manager that uses threads to execute containers. 055 */ 056@Private 057@DriverSide 058public final class ResourceManager { 059 060 private final static Logger LOG = Logger.getLogger(ResourceManager.class.getName()); 061 062 private final ResourceRequestQueue requestQueue = new ResourceRequestQueue(); 063 064 private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler; 065 private final ContainerManager theContainers; 066 private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler; 067 private final int defaultMemorySize; 068 private final int defaultNumberOfCores; 069 private final ConfigurationSerializer configurationSerializer; 070 private final RemoteManager remoteManager; 071 private final REEFFileNames fileNames; 072 private final ClasspathProvider classpathProvider; 073 private final double jvmHeapFactor; 074 private final LoggingScopeFactory loggingScopeFactory; 075 076 @Inject 077 ResourceManager( 078 final ContainerManager containerManager, 079 final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler, 080 final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler, 081 final @Parameter(GlobalLibraries.class) Set<String> globalLibraries, 082 final @Parameter(GlobalFiles.class) Set<String> globalFiles, 083 final @Parameter(DefaultMemorySize.class) int defaultMemorySize, 084 final @Parameter(DefaultNumberOfCores.class) int defaultNumberOfCores, 085 final @Parameter(JVMHeapSlack.class) double jvmHeapSlack, 086 final ConfigurationSerializer configurationSerializer, 087 final RemoteManager remoteManager, 088 final REEFFileNames fileNames, 089 final ClasspathProvider classpathProvider, 090 final LoggingScopeFactory loggingScopeFactory) { 091 092 this.theContainers = containerManager; 093 this.allocationHandler = allocationHandler; 094 this.runtimeStatusHandlerEventHandler = runtimeStatusHandlerEventHandler; 095 this.configurationSerializer = configurationSerializer; 096 this.remoteManager = remoteManager; 097 this.defaultMemorySize = defaultMemorySize; 098 this.defaultNumberOfCores = defaultNumberOfCores; 099 this.fileNames = fileNames; 100 this.classpathProvider = classpathProvider; 101 this.jvmHeapFactor = 1.0 - jvmHeapSlack; 102 this.loggingScopeFactory = loggingScopeFactory; 103 104 LOG.log(Level.FINE, "Instantiated 'ResourceManager'"); 105 } 106 107 /** 108 * Extracts the files out of the launchRequest. 109 * 110 * @param launchRequest the ResourceLaunchProto to parse 111 * @return a list of files set in the given ResourceLaunchProto 112 */ 113 private static List<File> getLocalFiles(final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) { 114 final List<File> files = new ArrayList<>(); // Libraries local to this evaluator 115 for (final ReefServiceProtos.FileResourceProto frp : launchRequest.getFileList()) { 116 files.add(new File(frp.getPath()).getAbsoluteFile()); 117 } 118 return files; 119 } 120 121 /** 122 * Receives a resource request. 123 * <p/> 124 * If the request can be met, it will also be satisfied immediately. 125 * 126 * @param resourceRequest the resource request to be handled. 127 */ 128 final void onResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto resourceRequest) { 129 synchronized (this.theContainers) { 130 this.requestQueue.add(new ResourceRequest(resourceRequest)); 131 this.checkRequestQueue(); 132 } 133 } 134 135 /** 136 * Receives and processes a resource release request. 137 * 138 * @param releaseRequest the release request to be processed 139 */ 140 final void onResourceReleaseRequest(final DriverRuntimeProtocol.ResourceReleaseProto releaseRequest) { 141 synchronized (this.theContainers) { 142 LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier()); 143 this.theContainers.release(releaseRequest.getIdentifier()); 144 this.checkRequestQueue(); 145 } 146 } 147 148 /** 149 * Called when the ReefRunnableProcessObserver detects that the Evaluator process has exited. 150 * 151 * @param evaluatorId the ID of the Evaluator that exited. 152 */ 153 public final void onEvaluatorExit(final String evaluatorId) { 154 synchronized (this.theContainers) { 155 this.theContainers.release(evaluatorId); 156 this.checkRequestQueue(); 157 } 158 } 159 160 /** 161 * Processes a resource launch request. 162 * 163 * @param launchRequest the launch request to be processed. 164 */ 165 final void onResourceLaunchRequest( 166 final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) { 167 168 synchronized (this.theContainers) { 169 170 final Container c = this.theContainers.get(launchRequest.getIdentifier()); 171 172 try (final LoggingScope lb = this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:evaluatorConfigurationFile")) { 173 // Add the global files and libraries. 174 c.addGlobalFiles(this.fileNames.getGlobalFolder()); 175 c.addLocalFiles(getLocalFiles(launchRequest)); 176 177 // Make the configuration file of the evaluator. 178 final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath()); 179 180 try { 181 this.configurationSerializer.toFile(this.configurationSerializer.fromString(launchRequest.getEvaluatorConf()), 182 evaluatorConfigurationFile); 183 } catch (final IOException | BindException e) { 184 throw new RuntimeException("Unable to write configuration.", e); 185 } 186 } 187 188 try (final LoggingScope lc = this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:runCommand")) { 189 // Assemble the command line 190 final LaunchCommandBuilder commandBuilder; 191 switch (launchRequest.getType()) { 192 case JVM: 193 commandBuilder = new JavaLaunchCommandBuilder() 194 .setClassPath(this.classpathProvider.getEvaluatorClasspath()); 195 break; 196 case CLR: 197 commandBuilder = new CLRLaunchCommandBuilder(); 198 break; 199 default: 200 throw new IllegalArgumentException( 201 "Unsupported container type: " + launchRequest.getType()); 202 } 203 204 final List<String> command = commandBuilder 205 .setErrorHandlerRID(this.remoteManager.getMyIdentifier()) 206 .setLaunchID(c.getNodeID()) 207 .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath()) 208 .setMemory((int) (this.jvmHeapFactor * c.getMemory())) 209 .build(); 210 211 LOG.log(Level.FINEST, "Launching container: {0}", c); 212 c.run(command); 213 } 214 } 215 } 216 217 /** 218 * Checks the allocation queue for new allocations and if there are any 219 * satisfies them. 220 */ 221 private void checkRequestQueue() { 222 223 if (this.theContainers.hasContainerAvailable() && this.requestQueue.hasOutStandingRequests()) { 224 225 // Record the satisfaction of one request and get its details. 226 final DriverRuntimeProtocol.ResourceRequestProto requestProto = this.requestQueue.satisfyOne(); 227 228 // Allocate a Container 229 final Container container = this.theContainers.allocateOne( 230 requestProto.hasMemorySize() ? requestProto.getMemorySize() : this.defaultMemorySize, 231 requestProto.hasVirtualCores() ? requestProto.getVirtualCores() : this.defaultNumberOfCores); 232 233 // Tell the receivers about it 234 final DriverRuntimeProtocol.ResourceAllocationProto alloc = 235 DriverRuntimeProtocol.ResourceAllocationProto.newBuilder() 236 .setIdentifier(container.getContainerID()) 237 .setNodeId(container.getNodeID()) 238 .setResourceMemory(container.getMemory()) 239 .setVirtualCores(container.getNumberOfCores()) 240 .build(); 241 242 LOG.log(Level.FINEST, "Allocating container: {0}", container); 243 this.allocationHandler.onNext(alloc); 244 245 // update REEF 246 this.sendRuntimeStatus(); 247 248 // Check whether we can satisfy another one. 249 this.checkRequestQueue(); 250 251 } else { 252 this.sendRuntimeStatus(); 253 } 254 } 255 256 private void sendRuntimeStatus() { 257 258 final DriverRuntimeProtocol.RuntimeStatusProto msg = 259 DriverRuntimeProtocol.RuntimeStatusProto.newBuilder() 260 .setName("LOCAL") 261 .setState(ReefServiceProtos.State.RUNNING) 262 .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests()) 263 .addAllContainerAllocation(this.theContainers.getAllocatedContainerIDs()) 264 .build(); 265 266 LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}", 267 new Object[]{msg.getContainerAllocationCount(), msg.getOutstandingContainerRequests()}); 268 this.runtimeStatusHandlerEventHandler.onNext(msg); 269 } 270}