001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.evaluator.*; 025import org.apache.reef.runtime.common.driver.api.ResourceLaunchEventImpl; 026import org.apache.reef.runtime.common.evaluator.EvaluatorConfiguration; 027import org.apache.reef.tang.Configuration; 028import org.apache.reef.tang.ConfigurationBuilder; 029import org.apache.reef.tang.ConfigurationProvider; 030import org.apache.reef.tang.Tang; 031import org.apache.reef.tang.formats.ConfigurationModule; 032import org.apache.reef.tang.formats.ConfigurationSerializer; 033import org.apache.reef.util.Optional; 034import org.apache.reef.util.logging.LoggingScope; 035import org.apache.reef.util.logging.LoggingScopeFactory; 036 037import java.io.File; 038import java.util.Collection; 039import java.util.HashSet; 040import java.util.Set; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Driver-Side representation of an allocated evaluator. 046 */ 047@DriverSide 048@Private 049public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { 050 051 private static final Logger LOG = Logger.getLogger(AllocatedEvaluatorImpl.class.getName()); 052 053 private final EvaluatorManager evaluatorManager; 054 private final String remoteID; 055 private final ConfigurationSerializer configurationSerializer; 056 private final String jobIdentifier; 057 private final LoggingScopeFactory loggingScopeFactory; 058 private final Set<ConfigurationProvider> evaluatorConfigurationProviders; 059 060 /** 061 * The set of files to be places on the Evaluator. 062 */ 063 private final Collection<File> files = new HashSet<>(); 064 065 /** 066 * The set of libraries. 067 */ 068 private final Collection<File> libraries = new HashSet<>(); 069 070 AllocatedEvaluatorImpl(final EvaluatorManager evaluatorManager, 071 final String remoteID, 072 final ConfigurationSerializer configurationSerializer, 073 final String jobIdentifier, 074 final LoggingScopeFactory loggingScopeFactory, 075 final Set<ConfigurationProvider> evaluatorConfigurationProviders) { 076 this.evaluatorManager = evaluatorManager; 077 this.remoteID = remoteID; 078 this.configurationSerializer = configurationSerializer; 079 this.jobIdentifier = jobIdentifier; 080 this.loggingScopeFactory = loggingScopeFactory; 081 this.evaluatorConfigurationProviders = evaluatorConfigurationProviders; 082 } 083 084 @Override 085 public String getId() { 086 return this.evaluatorManager.getId(); 087 } 088 089 @Override 090 public void close() { 091 this.evaluatorManager.close(); 092 } 093 094 @Override 095 public void submitTask(final Configuration taskConfiguration) { 096 final Configuration contextConfiguration = ContextConfiguration.CONF 097 .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId()) 098 .build(); 099 this.submitContextAndTask(contextConfiguration, taskConfiguration); 100 } 101 102 /** 103 * Submit Task with configuration strings. 104 * This method should be called from bridge and the configuration strings are 105 * serialized at .Net side. 106 * @param taskConfiguration 107 */ 108 public void submitTask(final String taskConfiguration) { 109 final Configuration contextConfiguration = ContextConfiguration.CONF 110 .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId()) 111 .build(); 112 final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration); 113 this.launchWithConfigurationString( 114 contextConfigurationString, Optional.<String>empty(), Optional.of(taskConfiguration)); 115 } 116 117 @Override 118 public EvaluatorDescriptor getEvaluatorDescriptor() { 119 return this.evaluatorManager.getEvaluatorDescriptor(); 120 } 121 122 @Override 123 public void submitContext(final Configuration contextConfiguration) { 124 launch(contextConfiguration, Optional.<Configuration>empty(), Optional.<Configuration>empty()); 125 } 126 127 /** 128 * Submit Context with configuration strings. 129 * This method should be called from bridge and the configuration strings are 130 * serialized at .Net side. 131 * @param contextConfiguration 132 */ 133 public void submitContext(final String contextConfiguration) { 134 launchWithConfigurationString(contextConfiguration, Optional.<String>empty(), Optional.<String>empty()); 135 } 136 137 @Override 138 public void submitContextAndService(final Configuration contextConfiguration, 139 final Configuration serviceConfiguration) { 140 launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.<Configuration>empty()); 141 } 142 143 /** 144 * Submit Context and Service with configuration strings. 145 * This method should be called from bridge and the configuration strings are 146 * serialized at .Net side. 147 * @param contextConfiguration 148 * @param serviceConfiguration 149 */ 150 public void submitContextAndService(final String contextConfiguration, 151 final String serviceConfiguration) { 152 launchWithConfigurationString(contextConfiguration, Optional.of(serviceConfiguration), Optional.<String>empty()); 153 } 154 155 @Override 156 public void submitContextAndTask(final Configuration contextConfiguration, 157 final Configuration taskConfiguration) { 158 launch(contextConfiguration, Optional.<Configuration>empty(), Optional.of(taskConfiguration)); 159 } 160 161 /** 162 * Submit Context and Task with configuration strings. 163 * This method should be called from bridge and the configuration strings are 164 * serialized at .Net side. 165 * @param contextConfiguration 166 * @param taskConfiguration 167 */ 168 public void submitContextAndTask(final String contextConfiguration, 169 final String taskConfiguration) { 170 this.launchWithConfigurationString(contextConfiguration, Optional.<String>empty(), Optional.of(taskConfiguration)); 171 } 172 173 @Override 174 public void submitContextAndServiceAndTask(final Configuration contextConfiguration, 175 final Configuration serviceConfiguration, 176 final Configuration taskConfiguration) { 177 launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration)); 178 } 179 180 /** 181 * Submit Context and Service with configuration strings. 182 * This method should be called from bridge and the configuration strings are 183 * serialized at .Net side 184 * @param contextConfiguration 185 * @param serviceConfiguration 186 * @param taskConfiguration 187 */ 188 public void submitContextAndServiceAndTask(final String contextConfiguration, 189 final String serviceConfiguration, 190 final String taskConfiguration) { 191 launchWithConfigurationString( 192 contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration)); 193 } 194 195 @Override 196 public void setProcess(final EvaluatorProcess process) { 197 this.evaluatorManager.setProcess(process); 198 } 199 200 @Override 201 public void addFile(final File file) { 202 this.files.add(file); 203 } 204 205 @Override 206 public void addLibrary(final File file) { 207 this.libraries.add(file); 208 } 209 210 private void launch(final Configuration contextConfiguration, 211 final Optional<Configuration> serviceConfiguration, 212 final Optional<Configuration> taskConfiguration) { 213 try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) { 214 final Configuration evaluatorConfiguration = 215 makeEvaluatorConfiguration(contextConfiguration, serviceConfiguration, taskConfiguration); 216 217 resourceBuildAndLaunch(evaluatorConfiguration); 218 } 219 } 220 221 /** 222 * Submit Context, Service and Task with configuration strings. 223 * This method should be called from bridge and the configuration strings are 224 * serialized at .Net side 225 * @param contextConfiguration 226 * @param serviceConfiguration 227 * @param taskConfiguration 228 */ 229 private void launchWithConfigurationString(final String contextConfiguration, 230 final Optional<String> serviceConfiguration, 231 final Optional<String> taskConfiguration) { 232 try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) { 233 final Configuration evaluatorConfiguration = 234 makeEvaluatorConfiguration(contextConfiguration, serviceConfiguration, taskConfiguration); 235 236 resourceBuildAndLaunch(evaluatorConfiguration); 237 } 238 } 239 240 private void resourceBuildAndLaunch(final Configuration evaluatorConfiguration) { 241 final ResourceLaunchEventImpl.Builder rbuilder = 242 ResourceLaunchEventImpl.newBuilder() 243 .setIdentifier(this.evaluatorManager.getId()) 244 .setRemoteId(this.remoteID) 245 .setEvaluatorConf(evaluatorConfiguration) 246 .addFiles(this.files) 247 .addLibraries(this.libraries) 248 .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName()); 249 250 rbuilder.setProcess(this.evaluatorManager.getEvaluatorDescriptor().getProcess()); 251 this.evaluatorManager.onResourceLaunch(rbuilder.build()); 252 } 253 254 /** 255 * Make configuration for evaluator. 256 * @param contextConfiguration 257 * @param serviceConfiguration 258 * @param taskConfiguration 259 * @return Configuration 260 */ 261 private Configuration makeEvaluatorConfiguration(final Configuration contextConfiguration, 262 final Optional<Configuration> serviceConfiguration, 263 final Optional<Configuration> taskConfiguration) { 264 265 final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration); 266 267 final Optional<String> taskConfigurationString; 268 if (taskConfiguration.isPresent()) { 269 taskConfigurationString = Optional.of(this.configurationSerializer.toString(taskConfiguration.get())); 270 } else { 271 taskConfigurationString = Optional.<String>empty(); 272 } 273 274 final Optional<Configuration> mergedServiceConfiguration = makeRootServiceConfiguration(serviceConfiguration); 275 if (mergedServiceConfiguration.isPresent()) { 276 final String serviceConfigurationString = this.configurationSerializer.toString(mergedServiceConfiguration.get()); 277 return makeEvaluatorConfiguration( 278 contextConfigurationString, Optional.of(serviceConfigurationString), taskConfigurationString); 279 } else { 280 return makeEvaluatorConfiguration(contextConfigurationString, Optional.<String>empty(), taskConfigurationString); 281 } 282 } 283 284 /** 285 * Make configuration for Evaluator. 286 * @param contextConfiguration 287 * @param serviceConfiguration 288 * @param taskConfiguration 289 * @return Configuration 290 */ 291 private Configuration makeEvaluatorConfiguration(final String contextConfiguration, 292 final Optional<String> serviceConfiguration, 293 final Optional<String> taskConfiguration) { 294 295 final ConfigurationModule evaluatorConfigModule; 296 if (this.evaluatorManager.getEvaluatorDescriptor().getProcess() instanceof CLRProcess) { 297 evaluatorConfigModule = EvaluatorConfiguration.CONFCLR; 298 } else { 299 evaluatorConfigModule = EvaluatorConfiguration.CONF; 300 } 301 ConfigurationModule evaluatorConfigurationModule = evaluatorConfigModule 302 .set(EvaluatorConfiguration.APPLICATION_IDENTIFIER, this.jobIdentifier) 303 .set(EvaluatorConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteID) 304 .set(EvaluatorConfiguration.EVALUATOR_IDENTIFIER, this.getId()) 305 .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, contextConfiguration); 306 307 // Add the (optional) service configuration 308 if (serviceConfiguration.isPresent()) { 309 evaluatorConfigurationModule = evaluatorConfigurationModule 310 .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, serviceConfiguration.get()); 311 } 312 313 // Add the (optional) task configuration 314 if (taskConfiguration.isPresent()) { 315 evaluatorConfigurationModule = evaluatorConfigurationModule 316 .set(EvaluatorConfiguration.TASK_CONFIGURATION, taskConfiguration.get()); 317 } 318 319 // Create the evaluator configuration. 320 return evaluatorConfigurationModule.build(); 321 } 322 323 /** 324 * Merges the Configurations provided by the evaluatorConfigurationProviders into the given 325 * serviceConfiguration, if any. 326 */ 327 private Optional<Configuration> makeRootServiceConfiguration(final Optional<Configuration> serviceConfiguration) { 328 final EvaluatorType evaluatorType = this.evaluatorManager.getEvaluatorDescriptor().getProcess().getType(); 329 if (EvaluatorType.CLR == evaluatorType) { 330 LOG.log(Level.FINE, "Not using the ConfigurationProviders as we are configuring a {0} Evaluator.", evaluatorType); 331 return serviceConfiguration; 332 } 333 334 if (!serviceConfiguration.isPresent() && this.evaluatorConfigurationProviders.isEmpty()) { 335 // No configurations to merge. 336 LOG.info("No service configuration given and no ConfigurationProviders set."); 337 return Optional.empty(); 338 } else { 339 final ConfigurationBuilder configurationBuilder = getConfigurationBuilder(serviceConfiguration); 340 for (final ConfigurationProvider configurationProvider : this.evaluatorConfigurationProviders) { 341 configurationBuilder.addConfiguration(configurationProvider.getConfiguration()); 342 } 343 return Optional.of(configurationBuilder.build()); 344 } 345 } 346 347 /** 348 * Utility to build a ConfigurationBuilder from an Optional<Configuration>. 349 */ 350 private static ConfigurationBuilder getConfigurationBuilder(final Optional<Configuration> configuration) { 351 if (configuration.isPresent()) { 352 return Tang.Factory.getTang().newConfigurationBuilder(configuration.get()); 353 } else { 354 return Tang.Factory.getTang().newConfigurationBuilder(); 355 } 356 } 357 358 @Override 359 public String toString() { 360 return "AllocatedEvaluator{ID='" + getId() + "\'}"; 361 } 362}