001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.local.driver; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.evaluator.EvaluatorProcess; 024import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; 025import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; 026import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; 027import org.apache.reef.runtime.common.driver.api.RuntimeParameters; 028import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 029import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; 030import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; 031import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent; 032import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; 033import org.apache.reef.runtime.common.files.FileResource; 034import org.apache.reef.runtime.common.files.REEFFileNames; 035import org.apache.reef.runtime.common.parameters.JVMHeapSlack; 036import org.apache.reef.runtime.common.utils.RemoteManager; 037import org.apache.reef.tang.annotations.Parameter; 038import org.apache.reef.tang.exceptions.BindException; 039import org.apache.reef.tang.formats.ConfigurationSerializer; 040import org.apache.reef.util.Optional; 041import org.apache.reef.util.logging.LoggingScope; 042import org.apache.reef.util.logging.LoggingScopeFactory; 043import org.apache.reef.wake.EventHandler; 044 045import java.io.File; 046import java.io.IOException; 047import java.util.ArrayList; 048import java.util.List; 049import java.util.logging.Level; 050import java.util.logging.Logger; 051 052import javax.inject.Inject; 053 054/** 055 * A resource manager that uses threads to execute containers. 056 */ 057@Private 058@DriverSide 059public final class ResourceManager { 060 061 private static final Logger LOG = Logger.getLogger(ResourceManager.class.getName()); 062 063 private final ResourceRequestQueue requestQueue = new ResourceRequestQueue(); 064 065 private final EventHandler<ResourceAllocationEvent> allocationHandler; 066 private final ContainerManager theContainers; 067 private final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler; 068 private final ConfigurationSerializer configurationSerializer; 069 private final RemoteManager remoteManager; 070 private final REEFFileNames fileNames; 071 private final double jvmHeapFactor; 072 private final LoggingScopeFactory loggingScopeFactory; 073 074 @Inject 075 ResourceManager( 076 final ContainerManager containerManager, 077 @Parameter(RuntimeParameters.ResourceAllocationHandler.class) 078 final EventHandler<ResourceAllocationEvent> allocationHandler, 079 @Parameter(RuntimeParameters.RuntimeStatusHandler.class) 080 final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler, 081 @Parameter(JVMHeapSlack.class) final double jvmHeapSlack, 082 final ConfigurationSerializer configurationSerializer, 083 final RemoteManager remoteManager, 084 final REEFFileNames fileNames, 085 final LoggingScopeFactory loggingScopeFactory) { 086 087 this.theContainers = containerManager; 088 this.allocationHandler = allocationHandler; 089 this.runtimeStatusHandlerEventHandler = runtimeStatusHandlerEventHandler; 090 this.configurationSerializer = configurationSerializer; 091 this.remoteManager = remoteManager; 092 this.fileNames = fileNames; 093 this.jvmHeapFactor = 1.0 - jvmHeapSlack; 094 this.loggingScopeFactory = loggingScopeFactory; 095 096 LOG.log(Level.FINE, "Instantiated 'ResourceManager'"); 097 } 098 099 /** 100 * Extracts the files out of the launchRequest. 101 * 102 * @param launchRequest the ResourceLaunchProto to parse 103 * @return a list of files set in the given ResourceLaunchProto 104 */ 105 private static List<File> getLocalFiles(final ResourceLaunchEvent launchRequest) { 106 final List<File> files = new ArrayList<>(); // Libraries local to this evaluator 107 for (final FileResource frp : launchRequest.getFileSet()) { 108 files.add(new File(frp.getPath()).getAbsoluteFile()); 109 } 110 return files; 111 } 112 113 /** 114 * Receives a resource request. 115 * <p> 116 * If the request can be met, it will also be satisfied immediately. 117 * 118 * @param resourceRequest the resource request to be handled. 119 */ 120 void onResourceRequest(final ResourceRequestEvent resourceRequest) { 121 synchronized (this.theContainers) { 122 this.requestQueue.add(new ResourceRequest(resourceRequest)); 123 this.checkRequestQueue(); 124 } 125 } 126 127 /** 128 * Receives and processes a resource release request. 129 * 130 * @param releaseRequest the release request to be processed 131 */ 132 void onResourceReleaseRequest(final ResourceReleaseEvent releaseRequest) { 133 synchronized (this.theContainers) { 134 LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier()); 135 this.theContainers.release(releaseRequest.getIdentifier()); 136 this.checkRequestQueue(); 137 } 138 } 139 140 /** 141 * Called when the ReefRunnableProcessObserver detects that the Evaluator process has exited. 142 * 143 * @param evaluatorId the ID of the Evaluator that exited. 144 */ 145 public void onEvaluatorExit(final String evaluatorId) { 146 synchronized (this.theContainers) { 147 this.theContainers.release(evaluatorId); 148 this.checkRequestQueue(); 149 } 150 } 151 152 /** 153 * Processes a resource launch request. 154 * 155 * @param launchRequest the launch request to be processed. 156 */ 157 void onResourceLaunchRequest( 158 final ResourceLaunchEvent launchRequest) { 159 160 synchronized (this.theContainers) { 161 162 final Container c = this.theContainers.get(launchRequest.getIdentifier()); 163 164 try (final LoggingScope lb = this.loggingScopeFactory 165 .getNewLoggingScope("ResourceManager.onResourceLaunchRequest:evaluatorConfigurationFile")) { 166 // Add the global files and libraries. 167 c.addGlobalFiles(this.fileNames.getGlobalFolder()); 168 c.addLocalFiles(getLocalFiles(launchRequest)); 169 170 // Make the configuration file of the evaluator. 171 final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath()); 172 173 try { 174 this.configurationSerializer.toFile(launchRequest.getEvaluatorConf(), evaluatorConfigurationFile); 175 } catch (final IOException | BindException e) { 176 throw new RuntimeException("Unable to write configuration.", e); 177 } 178 } 179 180 try (final LoggingScope lc = this.loggingScopeFactory 181 .getNewLoggingScope("ResourceManager.onResourceLaunchRequest:runCommand")) { 182 183 final List<String> command = getLaunchCommand(launchRequest, c.getMemory()); 184 LOG.log(Level.FINEST, "Launching container: {0}", c); 185 c.run(command); 186 } 187 } 188 } 189 190 private List<String> getLaunchCommand(final ResourceLaunchEvent launchRequest, 191 final int containerMemory) { 192 final EvaluatorProcess process = launchRequest.getProcess() 193 .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath()); 194 195 if (process.isOptionSet()) { 196 return process.getCommandLine(); 197 } else { 198 return process 199 .setMemory((int) (this.jvmHeapFactor * containerMemory)) 200 .getCommandLine(); 201 } 202 } 203 204 /** 205 /** 206 * Checks the allocation queue for new allocations and if there are any 207 * satisfies them. 208 */ 209 private void checkRequestQueue() { 210 211 if (requestQueue.hasOutStandingRequests()) { 212 final ResourceRequest resourceRequest = requestQueue.head(); 213 final ResourceRequestEvent requestEvent = resourceRequest.getRequestProto(); 214 final Optional<Container> cont = theContainers.allocateContainer(requestEvent); 215 if (cont.isPresent()) { 216 // Container has been allocated 217 requestQueue.satisfyOne(); 218 final Container container = cont.get(); 219 // Tell the receivers about it 220 final ResourceAllocationEvent alloc = ResourceEventImpl.newAllocationBuilder() 221 .setIdentifier(container.getContainerID()).setNodeId(container.getNodeID()) 222 .setResourceMemory(container.getMemory()).setVirtualCores(container.getNumberOfCores()) 223 .setRackName(container.getRackName()).setRuntimeName(RuntimeIdentifier.RUNTIME_NAME).build(); 224 225 LOG.log(Level.FINEST, "Allocating container: {0}", container); 226 this.allocationHandler.onNext(alloc); 227 // update REEF 228 this.sendRuntimeStatus(); 229 230 // Check whether we can satisfy another one. 231 this.checkRequestQueue(); 232 } else { 233 // could not allocate, update REEF 234 this.sendRuntimeStatus(); 235 } 236 } else { 237 // done 238 this.sendRuntimeStatus(); 239 } 240 } 241 242 private void sendRuntimeStatus() { 243 244 final RuntimeStatusEventImpl.Builder builder = 245 RuntimeStatusEventImpl.newBuilder() 246 .setName("LOCAL") 247 .setState(State.RUNNING) 248 .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests()); 249 for (final String containerAllocation : this.theContainers.getAllocatedContainerIDs()) { 250 builder.addContainerAllocation(containerAllocation); 251 } 252 final RuntimeStatusEvent msg = builder.build(); 253 254 LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}", 255 new Object[]{msg.getContainerAllocationList().size(), msg.getOutstandingContainerRequests()}); 256 this.runtimeStatusHandlerEventHandler.onNext(msg); 257 } 258}