001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.evaluator; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.evaluator.*; 025import org.apache.reef.runtime.common.driver.api.ResourceLaunchEventImpl; 026import org.apache.reef.runtime.common.evaluator.EvaluatorConfiguration; 027import org.apache.reef.tang.Configuration; 028import org.apache.reef.tang.ConfigurationBuilder; 029import org.apache.reef.tang.ConfigurationProvider; 030import org.apache.reef.tang.Tang; 031import org.apache.reef.tang.formats.ConfigurationModule; 032import org.apache.reef.tang.formats.ConfigurationSerializer; 033import org.apache.reef.util.Optional; 034import org.apache.reef.util.logging.LoggingScope; 035import org.apache.reef.util.logging.LoggingScopeFactory; 036 037import java.io.File; 038import java.util.Collection; 039import java.util.HashSet; 040import java.util.Set; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Driver-Side representation of an allocated evaluator. 046 */ 047@DriverSide 048@Private 049public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { 050 051 private static final Logger LOG = Logger.getLogger(AllocatedEvaluatorImpl.class.getName()); 052 053 private final EvaluatorManager evaluatorManager; 054 private final String remoteID; 055 private final ConfigurationSerializer configurationSerializer; 056 private final String jobIdentifier; 057 private final LoggingScopeFactory loggingScopeFactory; 058 private final Set<ConfigurationProvider> evaluatorConfigurationProviders; 059 060 /** 061 * The set of files to be places on the Evaluator. 062 */ 063 private final Collection<File> files = new HashSet<>(); 064 065 /** 066 * The set of libraries. 067 */ 068 private final Collection<File> libraries = new HashSet<>(); 069 070 AllocatedEvaluatorImpl(final EvaluatorManager evaluatorManager, 071 final String remoteID, 072 final ConfigurationSerializer configurationSerializer, 073 final String jobIdentifier, 074 final LoggingScopeFactory loggingScopeFactory, 075 final Set<ConfigurationProvider> evaluatorConfigurationProviders) { 076 this.evaluatorManager = evaluatorManager; 077 this.remoteID = remoteID; 078 this.configurationSerializer = configurationSerializer; 079 this.jobIdentifier = jobIdentifier; 080 this.loggingScopeFactory = loggingScopeFactory; 081 this.evaluatorConfigurationProviders = evaluatorConfigurationProviders; 082 } 083 084 @Override 085 public String getId() { 086 return this.evaluatorManager.getId(); 087 } 088 089 @Override 090 public void close() { 091 this.evaluatorManager.close(); 092 } 093 094 @Override 095 public void submitTask(final Configuration taskConfiguration) { 096 final Configuration contextConfiguration = ContextConfiguration.CONF 097 .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId()) 098 .build(); 099 this.submitContextAndTask(contextConfiguration, taskConfiguration); 100 } 101 102 /** 103 * Submit Task with configuration strings. 104 * This method should be called from bridge and the configuration strings are 105 * serialized at .Net side. 106 * @param evaluatorConfiguration 107 * @param taskConfiguration 108 */ 109 public void submitTask(final String evaluatorConfiguration, final String taskConfiguration) { 110 final Configuration contextConfiguration = ContextConfiguration.CONF 111 .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId()) 112 .build(); 113 final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration); 114 this.launchWithConfigurationString( 115 evaluatorConfiguration, contextConfigurationString, Optional.<String>empty(), Optional.of(taskConfiguration)); 116 } 117 118 @Override 119 public EvaluatorDescriptor getEvaluatorDescriptor() { 120 return this.evaluatorManager.getEvaluatorDescriptor(); 121 } 122 123 @Override 124 public void submitContext(final Configuration contextConfiguration) { 125 launch(contextConfiguration, Optional.<Configuration>empty(), Optional.<Configuration>empty()); 126 } 127 128 /** 129 * Submit Context with configuration strings. 130 * This method should be called from bridge and the configuration strings are 131 * serialized at .Net side. 132 * @param evaluatorConfiguration 133 * @param contextConfiguration 134 */ 135 public void submitContext(final String evaluatorConfiguration, final String contextConfiguration) { 136 launchWithConfigurationString(evaluatorConfiguration, contextConfiguration, Optional.<String>empty(), 137 Optional.<String>empty()); 138 } 139 140 @Override 141 public void submitContextAndService(final Configuration contextConfiguration, 142 final Configuration serviceConfiguration) { 143 launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.<Configuration>empty()); 144 } 145 146 /** 147 * Submit Context and Service with configuration strings. 148 * This method should be called from bridge and the configuration strings are 149 * serialized at .Net side. 150 * @param evaluatorConfiguration 151 * @param contextConfiguration 152 * @param serviceConfiguration 153 */ 154 public void submitContextAndService(final String evaluatorConfiguration, 155 final String contextConfiguration, 156 final String serviceConfiguration) { 157 launchWithConfigurationString(evaluatorConfiguration, contextConfiguration, 158 Optional.of(serviceConfiguration), Optional.<String>empty()); 159 } 160 161 @Override 162 public void submitContextAndTask(final Configuration contextConfiguration, 163 final Configuration taskConfiguration) { 164 launch(contextConfiguration, Optional.<Configuration>empty(), Optional.of(taskConfiguration)); 165 } 166 167 /** 168 * Submit Context and Task with configuration strings. 169 * This method should be called from bridge and the configuration strings are 170 * serialized at .Net side. 171 * @param evaluatorConfiguration 172 * @param contextConfiguration 173 * @param taskConfiguration 174 */ 175 public void submitContextAndTask(final String evaluatorConfiguration, 176 final String contextConfiguration, 177 final String taskConfiguration) { 178 this.launchWithConfigurationString(evaluatorConfiguration, contextConfiguration, 179 Optional.<String>empty(), Optional.of(taskConfiguration)); 180 } 181 182 @Override 183 public void submitContextAndServiceAndTask(final Configuration contextConfiguration, 184 final Configuration serviceConfiguration, 185 final Configuration taskConfiguration) { 186 launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration)); 187 } 188 189 /** 190 * Submit Context and Service with configuration strings. 191 * This method should be called from bridge and the configuration strings are 192 * serialized at .Net side 193 * @param evaluatorConfiguration 194 * @param contextConfiguration 195 * @param serviceConfiguration 196 * @param taskConfiguration 197 */ 198 public void submitContextAndServiceAndTask(final String evaluatorConfiguration, 199 final String contextConfiguration, 200 final String serviceConfiguration, 201 final String taskConfiguration) { 202 launchWithConfigurationString(evaluatorConfiguration, contextConfiguration, 203 Optional.of(serviceConfiguration), Optional.of(taskConfiguration)); 204 } 205 206 @Override 207 public void setProcess(final EvaluatorProcess process) { 208 this.evaluatorManager.setProcess(process); 209 } 210 211 @Override 212 public void addFile(final File file) { 213 this.files.add(file); 214 } 215 216 @Override 217 public void addLibrary(final File file) { 218 this.libraries.add(file); 219 } 220 221 private void launch(final Configuration contextConfiguration, 222 final Optional<Configuration> serviceConfiguration, 223 final Optional<Configuration> taskConfiguration) { 224 try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) { 225 final Configuration evaluatorConfiguration = 226 makeEvaluatorConfiguration(contextConfiguration, serviceConfiguration, taskConfiguration); 227 228 resourceBuildAndLaunch(evaluatorConfiguration); 229 } 230 } 231 232 /** 233 * Submit Context, Service and Task with configuration strings. 234 * This method should be called from bridge and the configuration strings are 235 * serialized at .Net side 236 * @param contextConfiguration 237 * @param evaluatorConfiguration 238 * @param serviceConfiguration 239 * @param taskConfiguration 240 */ 241 private void launchWithConfigurationString( 242 final String evaluatorConfiguration, 243 final String contextConfiguration, 244 final Optional<String> serviceConfiguration, 245 final Optional<String> taskConfiguration) { 246 try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) { 247 final Configuration submissionEvaluatorConfiguration = 248 makeEvaluatorConfiguration( 249 contextConfiguration, Optional.of(evaluatorConfiguration), serviceConfiguration, taskConfiguration); 250 251 resourceBuildAndLaunch(submissionEvaluatorConfiguration); 252 } 253 } 254 255 private void resourceBuildAndLaunch(final Configuration evaluatorConfiguration) { 256 final ResourceLaunchEventImpl.Builder rbuilder = 257 ResourceLaunchEventImpl.newBuilder() 258 .setIdentifier(this.evaluatorManager.getId()) 259 .setRemoteId(this.remoteID) 260 .setEvaluatorConf(evaluatorConfiguration) 261 .addFiles(this.files) 262 .addLibraries(this.libraries) 263 .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName()); 264 265 rbuilder.setProcess(this.evaluatorManager.getEvaluatorDescriptor().getProcess()); 266 this.evaluatorManager.onResourceLaunch(rbuilder.build()); 267 } 268 269 /** 270 * Make configuration for evaluator. 271 * @param contextConfiguration 272 * @param serviceConfiguration 273 * @param taskConfiguration 274 * @return Configuration 275 */ 276 private Configuration makeEvaluatorConfiguration(final Configuration contextConfiguration, 277 final Optional<Configuration> serviceConfiguration, 278 final Optional<Configuration> taskConfiguration) { 279 280 final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration); 281 282 final Optional<String> taskConfigurationString; 283 if (taskConfiguration.isPresent()) { 284 taskConfigurationString = Optional.of(this.configurationSerializer.toString(taskConfiguration.get())); 285 } else { 286 taskConfigurationString = Optional.empty(); 287 } 288 289 final Optional<Configuration> mergedServiceConfiguration = makeRootServiceConfiguration(serviceConfiguration); 290 if (mergedServiceConfiguration.isPresent()) { 291 final String serviceConfigurationString = this.configurationSerializer.toString(mergedServiceConfiguration.get()); 292 return makeEvaluatorConfiguration(contextConfigurationString, Optional.<String>empty(), 293 Optional.of(serviceConfigurationString), taskConfigurationString); 294 } else { 295 return makeEvaluatorConfiguration( 296 contextConfigurationString, Optional.<String>empty(), Optional.<String>empty(), taskConfigurationString); 297 } 298 } 299 300 /** 301 * Make configuration for Evaluator. 302 * @param contextConfiguration 303 * @param evaluatorConfiguration 304 * @param serviceConfiguration 305 * @param taskConfiguration 306 * @return Configuration 307 */ 308 private Configuration makeEvaluatorConfiguration(final String contextConfiguration, 309 final Optional<String> evaluatorConfiguration, 310 final Optional<String> serviceConfiguration, 311 final Optional<String> taskConfiguration) { 312 313 final ConfigurationModule evaluatorConfigModule; 314 if (this.evaluatorManager.getEvaluatorDescriptor().getProcess() instanceof CLRProcess) { 315 evaluatorConfigModule = EvaluatorConfiguration.CONFCLR; 316 } else { 317 evaluatorConfigModule = EvaluatorConfiguration.CONF; 318 } 319 ConfigurationModule evaluatorConfigurationModule = evaluatorConfigModule 320 .set(EvaluatorConfiguration.APPLICATION_IDENTIFIER, this.jobIdentifier) 321 .set(EvaluatorConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteID) 322 .set(EvaluatorConfiguration.EVALUATOR_IDENTIFIER, this.getId()) 323 .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, contextConfiguration); 324 325 // Add the (optional) service configuration 326 if (evaluatorConfiguration.isPresent()) { 327 evaluatorConfigurationModule = evaluatorConfigurationModule 328 .set(EvaluatorConfiguration.EVALUATOR_CONFIGURATION, evaluatorConfiguration.get()); 329 } 330 331 // Add the (optional) service configuration 332 if (serviceConfiguration.isPresent()) { 333 evaluatorConfigurationModule = evaluatorConfigurationModule 334 .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, serviceConfiguration.get()); 335 } else { 336 evaluatorConfigurationModule = evaluatorConfigurationModule 337 .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, 338 this.configurationSerializer.toString(Tang.Factory.getTang().newConfigurationBuilder().build())); 339 } 340 341 // Add the (optional) task configuration 342 if (taskConfiguration.isPresent()) { 343 evaluatorConfigurationModule = evaluatorConfigurationModule 344 .set(EvaluatorConfiguration.TASK_CONFIGURATION, taskConfiguration.get()); 345 } 346 347 // Create the evaluator configuration. 348 return evaluatorConfigurationModule.build(); 349 } 350 351 /** 352 * Merges the Configurations provided by the evaluatorConfigurationProviders into the given 353 * serviceConfiguration, if any. 354 */ 355 private Optional<Configuration> makeRootServiceConfiguration(final Optional<Configuration> serviceConfiguration) { 356 final EvaluatorType evaluatorType = this.evaluatorManager.getEvaluatorDescriptor().getProcess().getType(); 357 if (EvaluatorType.CLR == evaluatorType) { 358 LOG.log(Level.FINE, "Not using the ConfigurationProviders as we are configuring a {0} Evaluator.", evaluatorType); 359 return serviceConfiguration; 360 } 361 362 if (!serviceConfiguration.isPresent() && this.evaluatorConfigurationProviders.isEmpty()) { 363 // No configurations to merge. 364 LOG.info("No service configuration given and no ConfigurationProviders set."); 365 return Optional.empty(); 366 } else { 367 final ConfigurationBuilder configurationBuilder = getConfigurationBuilder(serviceConfiguration); 368 for (final ConfigurationProvider configurationProvider : this.evaluatorConfigurationProviders) { 369 configurationBuilder.addConfiguration(configurationProvider.getConfiguration()); 370 } 371 return Optional.of(configurationBuilder.build()); 372 } 373 } 374 375 /** 376 * Utility to build a ConfigurationBuilder from an Optional<Configuration>. 377 */ 378 private static ConfigurationBuilder getConfigurationBuilder(final Optional<Configuration> configuration) { 379 if (configuration.isPresent()) { 380 return Tang.Factory.getTang().newConfigurationBuilder(configuration.get()); 381 } else { 382 return Tang.Factory.getTang().newConfigurationBuilder(); 383 } 384 } 385 386 @Override 387 public String toString() { 388 return "AllocatedEvaluator{ID='" + getId() + "\'}"; 389 } 390}