001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.evaluator.*;
025import org.apache.reef.runtime.common.driver.api.ResourceLaunchEventImpl;
026import org.apache.reef.runtime.common.evaluator.EvaluatorConfiguration;
027import org.apache.reef.tang.Configuration;
028import org.apache.reef.tang.ConfigurationBuilder;
029import org.apache.reef.tang.ConfigurationProvider;
030import org.apache.reef.tang.Tang;
031import org.apache.reef.tang.formats.ConfigurationModule;
032import org.apache.reef.tang.formats.ConfigurationSerializer;
033import org.apache.reef.util.Optional;
034import org.apache.reef.util.logging.LoggingScope;
035import org.apache.reef.util.logging.LoggingScopeFactory;
036
037import java.io.File;
038import java.util.Collection;
039import java.util.HashSet;
040import java.util.Set;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044/**
045 * Driver-Side representation of an allocated evaluator.
046 */
047@DriverSide
048@Private
049public final class AllocatedEvaluatorImpl implements AllocatedEvaluator {
050
051  private static final Logger LOG = Logger.getLogger(AllocatedEvaluatorImpl.class.getName());
052
053  private final EvaluatorManager evaluatorManager;
054  private final String remoteID;
055  private final ConfigurationSerializer configurationSerializer;
056  private final String jobIdentifier;
057  private final LoggingScopeFactory loggingScopeFactory;
058  private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
059
060  /**
061   * The set of files to be places on the Evaluator.
062   */
063  private final Collection<File> files = new HashSet<>();
064
065  /**
066   * The set of libraries.
067   */
068  private final Collection<File> libraries = new HashSet<>();
069
070  AllocatedEvaluatorImpl(final EvaluatorManager evaluatorManager,
071                         final String remoteID,
072                         final ConfigurationSerializer configurationSerializer,
073                         final String jobIdentifier,
074                         final LoggingScopeFactory loggingScopeFactory,
075                         final Set<ConfigurationProvider> evaluatorConfigurationProviders) {
076    this.evaluatorManager = evaluatorManager;
077    this.remoteID = remoteID;
078    this.configurationSerializer = configurationSerializer;
079    this.jobIdentifier = jobIdentifier;
080    this.loggingScopeFactory = loggingScopeFactory;
081    this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
082  }
083
084  @Override
085  public String getId() {
086    return this.evaluatorManager.getId();
087  }
088
089  @Override
090  public void close() {
091    this.evaluatorManager.close();
092  }
093
094  @Override
095  public void submitTask(final Configuration taskConfiguration) {
096    final Configuration contextConfiguration = ContextConfiguration.CONF
097        .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId())
098        .build();
099    this.submitContextAndTask(contextConfiguration, taskConfiguration);
100  }
101
102  /**
103   * Submit Task with configuration strings.
104   * This method should be called from bridge and the configuration strings are
105   * serialized at .Net side.
106   * @param evaluatorConfiguration
107   * @param taskConfiguration
108   */
109  public void submitTask(final String evaluatorConfiguration, final String taskConfiguration) {
110    final Configuration contextConfiguration = ContextConfiguration.CONF
111        .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId())
112        .build();
113    final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration);
114    this.launchWithConfigurationString(
115        evaluatorConfiguration, contextConfigurationString,  Optional.<String>empty(), Optional.of(taskConfiguration));
116  }
117
118  @Override
119  public EvaluatorDescriptor getEvaluatorDescriptor() {
120    return this.evaluatorManager.getEvaluatorDescriptor();
121  }
122
123  @Override
124  public void submitContext(final Configuration contextConfiguration) {
125    launch(contextConfiguration, Optional.<Configuration>empty(), Optional.<Configuration>empty());
126  }
127
128  /**
129   * Submit Context with configuration strings.
130   * This method should be called from bridge and the configuration strings are
131   * serialized at .Net side.
132   * @param evaluatorConfiguration
133   * @param contextConfiguration
134   */
135  public void submitContext(final String evaluatorConfiguration, final String contextConfiguration) {
136    launchWithConfigurationString(evaluatorConfiguration, contextConfiguration, Optional.<String>empty(),
137        Optional.<String>empty());
138  }
139
140  @Override
141  public void submitContextAndService(final Configuration contextConfiguration,
142                                      final Configuration serviceConfiguration) {
143    launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.<Configuration>empty());
144  }
145
146  /**
147   * Submit Context and Service with configuration strings.
148   * This method should be called from bridge and the configuration strings are
149   * serialized at .Net side.
150   * @param evaluatorConfiguration
151   * @param contextConfiguration
152   * @param serviceConfiguration
153   */
154  public void submitContextAndService(final String evaluatorConfiguration,
155                                      final String contextConfiguration,
156                                      final String serviceConfiguration) {
157    launchWithConfigurationString(evaluatorConfiguration, contextConfiguration,
158        Optional.of(serviceConfiguration), Optional.<String>empty());
159  }
160
161  @Override
162  public void submitContextAndTask(final Configuration contextConfiguration,
163                                   final Configuration taskConfiguration) {
164    launch(contextConfiguration, Optional.<Configuration>empty(), Optional.of(taskConfiguration));
165  }
166
167  /**
168   * Submit Context and Task with configuration strings.
169   * This method should be called from bridge and the configuration strings are
170   * serialized at .Net side.
171   * @param evaluatorConfiguration
172   * @param contextConfiguration
173   * @param taskConfiguration
174   */
175  public void submitContextAndTask(final String evaluatorConfiguration,
176                                   final String contextConfiguration,
177                                   final String taskConfiguration) {
178    this.launchWithConfigurationString(evaluatorConfiguration, contextConfiguration,
179        Optional.<String>empty(), Optional.of(taskConfiguration));
180  }
181
182  @Override
183  public void submitContextAndServiceAndTask(final Configuration contextConfiguration,
184                                             final Configuration serviceConfiguration,
185                                             final Configuration taskConfiguration) {
186    launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration));
187  }
188
189  /**
190   * Submit Context and Service with configuration strings.
191   * This method should be called from bridge and the configuration strings are
192   * serialized at .Net side
193   * @param evaluatorConfiguration
194   * @param contextConfiguration
195   * @param serviceConfiguration
196   * @param taskConfiguration
197   */
198  public void submitContextAndServiceAndTask(final String evaluatorConfiguration,
199                                             final String contextConfiguration,
200                                             final String serviceConfiguration,
201                                             final String taskConfiguration) {
202    launchWithConfigurationString(evaluatorConfiguration, contextConfiguration,
203        Optional.of(serviceConfiguration), Optional.of(taskConfiguration));
204  }
205
206  @Override
207  public void setProcess(final EvaluatorProcess process) {
208    this.evaluatorManager.setProcess(process);
209  }
210
211  @Override
212  public void addFile(final File file) {
213    this.files.add(file);
214  }
215
216  @Override
217  public void addLibrary(final File file) {
218    this.libraries.add(file);
219  }
220
221  private void launch(final Configuration contextConfiguration,
222                      final Optional<Configuration> serviceConfiguration,
223                      final Optional<Configuration> taskConfiguration) {
224    try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) {
225      final Configuration evaluatorConfiguration =
226          makeEvaluatorConfiguration(contextConfiguration, serviceConfiguration, taskConfiguration);
227
228      resourceBuildAndLaunch(evaluatorConfiguration);
229    }
230  }
231
232  /**
233   * Submit Context, Service and Task with configuration strings.
234   * This method should be called from bridge and the configuration strings are
235   * serialized at .Net side
236   * @param contextConfiguration
237   * @param evaluatorConfiguration
238   * @param serviceConfiguration
239   * @param taskConfiguration
240   */
241  private void launchWithConfigurationString(
242      final String evaluatorConfiguration,
243      final String contextConfiguration,
244      final Optional<String> serviceConfiguration,
245      final Optional<String> taskConfiguration) {
246    try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) {
247      final Configuration submissionEvaluatorConfiguration =
248          makeEvaluatorConfiguration(
249              contextConfiguration, Optional.of(evaluatorConfiguration), serviceConfiguration, taskConfiguration);
250
251      resourceBuildAndLaunch(submissionEvaluatorConfiguration);
252    }
253  }
254
255  private void resourceBuildAndLaunch(final Configuration evaluatorConfiguration) {
256    final ResourceLaunchEventImpl.Builder rbuilder =
257        ResourceLaunchEventImpl.newBuilder()
258            .setIdentifier(this.evaluatorManager.getId())
259            .setRemoteId(this.remoteID)
260            .setEvaluatorConf(evaluatorConfiguration)
261            .addFiles(this.files)
262            .addLibraries(this.libraries)
263            .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName());
264
265    rbuilder.setProcess(this.evaluatorManager.getEvaluatorDescriptor().getProcess());
266    this.evaluatorManager.onResourceLaunch(rbuilder.build());
267  }
268
269  /**
270   * Make configuration for evaluator.
271   * @param contextConfiguration
272   * @param serviceConfiguration
273   * @param taskConfiguration
274   * @return Configuration
275   */
276  private Configuration makeEvaluatorConfiguration(final Configuration contextConfiguration,
277                                                   final Optional<Configuration> serviceConfiguration,
278                                                   final Optional<Configuration> taskConfiguration) {
279
280    final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration);
281
282    final Optional<String> taskConfigurationString;
283    if (taskConfiguration.isPresent()) {
284      taskConfigurationString = Optional.of(this.configurationSerializer.toString(taskConfiguration.get()));
285    } else {
286      taskConfigurationString = Optional.empty();
287    }
288
289    final Optional<Configuration> mergedServiceConfiguration = makeRootServiceConfiguration(serviceConfiguration);
290    if (mergedServiceConfiguration.isPresent()) {
291      final String serviceConfigurationString = this.configurationSerializer.toString(mergedServiceConfiguration.get());
292      return makeEvaluatorConfiguration(contextConfigurationString, Optional.<String>empty(),
293          Optional.of(serviceConfigurationString), taskConfigurationString);
294    } else {
295      return makeEvaluatorConfiguration(
296          contextConfigurationString, Optional.<String>empty(), Optional.<String>empty(), taskConfigurationString);
297    }
298  }
299
300  /**
301   * Make configuration for Evaluator.
302   * @param contextConfiguration
303   * @param evaluatorConfiguration
304   * @param serviceConfiguration
305   * @param taskConfiguration
306   * @return Configuration
307   */
308  private Configuration makeEvaluatorConfiguration(final String contextConfiguration,
309                                                   final Optional<String> evaluatorConfiguration,
310                                                   final Optional<String> serviceConfiguration,
311                                                   final Optional<String> taskConfiguration) {
312
313    final ConfigurationModule evaluatorConfigModule;
314    if (this.evaluatorManager.getEvaluatorDescriptor().getProcess() instanceof CLRProcess) {
315      evaluatorConfigModule = EvaluatorConfiguration.CONFCLR;
316    } else {
317      evaluatorConfigModule = EvaluatorConfiguration.CONF;
318    }
319    ConfigurationModule evaluatorConfigurationModule = evaluatorConfigModule
320        .set(EvaluatorConfiguration.APPLICATION_IDENTIFIER, this.jobIdentifier)
321        .set(EvaluatorConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteID)
322        .set(EvaluatorConfiguration.EVALUATOR_IDENTIFIER, this.getId())
323        .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, contextConfiguration);
324
325    // Add the (optional) service configuration
326    if (evaluatorConfiguration.isPresent()) {
327      evaluatorConfigurationModule = evaluatorConfigurationModule
328          .set(EvaluatorConfiguration.EVALUATOR_CONFIGURATION, evaluatorConfiguration.get());
329    }
330
331    // Add the (optional) service configuration
332    if (serviceConfiguration.isPresent()) {
333      evaluatorConfigurationModule = evaluatorConfigurationModule
334          .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, serviceConfiguration.get());
335    } else {
336      evaluatorConfigurationModule = evaluatorConfigurationModule
337          .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION,
338              this.configurationSerializer.toString(Tang.Factory.getTang().newConfigurationBuilder().build()));
339    }
340
341    // Add the (optional) task configuration
342    if (taskConfiguration.isPresent()) {
343      evaluatorConfigurationModule = evaluatorConfigurationModule
344          .set(EvaluatorConfiguration.TASK_CONFIGURATION, taskConfiguration.get());
345    }
346
347    // Create the evaluator configuration.
348    return evaluatorConfigurationModule.build();
349  }
350
351  /**
352   * Merges the Configurations provided by the evaluatorConfigurationProviders into the given
353   * serviceConfiguration, if any.
354   */
355  private Optional<Configuration> makeRootServiceConfiguration(final Optional<Configuration> serviceConfiguration) {
356    final EvaluatorType evaluatorType = this.evaluatorManager.getEvaluatorDescriptor().getProcess().getType();
357    if (EvaluatorType.CLR == evaluatorType) {
358      LOG.log(Level.FINE, "Not using the ConfigurationProviders as we are configuring a {0} Evaluator.", evaluatorType);
359      return serviceConfiguration;
360    }
361
362    if (!serviceConfiguration.isPresent() && this.evaluatorConfigurationProviders.isEmpty()) {
363      // No configurations to merge.
364      LOG.info("No service configuration given and no ConfigurationProviders set.");
365      return Optional.empty();
366    } else {
367      final ConfigurationBuilder configurationBuilder = getConfigurationBuilder(serviceConfiguration);
368      for (final ConfigurationProvider configurationProvider : this.evaluatorConfigurationProviders) {
369        configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
370      }
371      return Optional.of(configurationBuilder.build());
372    }
373  }
374
375  /**
376   * Utility to build a ConfigurationBuilder from an Optional<Configuration>.
377   */
378  private static ConfigurationBuilder getConfigurationBuilder(final Optional<Configuration> configuration) {
379    if (configuration.isPresent()) {
380      return Tang.Factory.getTang().newConfigurationBuilder(configuration.get());
381    } else {
382      return Tang.Factory.getTang().newConfigurationBuilder();
383    }
384  }
385
386  @Override
387  public String toString() {
388    return "AllocatedEvaluator{ID='" + getId() + "\'}";
389  }
390}