This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.evaluator.*;
025import org.apache.reef.runtime.common.driver.api.ResourceLaunchEventImpl;
026import org.apache.reef.runtime.common.evaluator.EvaluatorConfiguration;
027import org.apache.reef.tang.Configuration;
028import org.apache.reef.tang.ConfigurationBuilder;
029import org.apache.reef.tang.ConfigurationProvider;
030import org.apache.reef.tang.Tang;
031import org.apache.reef.tang.formats.ConfigurationModule;
032import org.apache.reef.tang.formats.ConfigurationSerializer;
033import org.apache.reef.util.Optional;
034import org.apache.reef.util.logging.LoggingScope;
035import org.apache.reef.util.logging.LoggingScopeFactory;
036
037import java.io.File;
038import java.util.Collection;
039import java.util.HashSet;
040import java.util.Set;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044/**
045 * Driver-Side representation of an allocated evaluator.
046 */
047@DriverSide
048@Private
049public final class AllocatedEvaluatorImpl implements AllocatedEvaluator {
050
051  private static final Logger LOG = Logger.getLogger(AllocatedEvaluatorImpl.class.getName());
052
053  private final EvaluatorManager evaluatorManager;
054  private final String remoteID;
055  private final ConfigurationSerializer configurationSerializer;
056  private final String jobIdentifier;
057  private final LoggingScopeFactory loggingScopeFactory;
058  private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
059
060  /**
061   * The set of files to be places on the Evaluator.
062   */
063  private final Collection<File> files = new HashSet<>();
064
065  /**
066   * The set of libraries.
067   */
068  private final Collection<File> libraries = new HashSet<>();
069
070  AllocatedEvaluatorImpl(final EvaluatorManager evaluatorManager,
071                         final String remoteID,
072                         final ConfigurationSerializer configurationSerializer,
073                         final String jobIdentifier,
074                         final LoggingScopeFactory loggingScopeFactory,
075                         final Set<ConfigurationProvider> evaluatorConfigurationProviders) {
076    this.evaluatorManager = evaluatorManager;
077    this.remoteID = remoteID;
078    this.configurationSerializer = configurationSerializer;
079    this.jobIdentifier = jobIdentifier;
080    this.loggingScopeFactory = loggingScopeFactory;
081    this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
082  }
083
084  @Override
085  public String getId() {
086    return this.evaluatorManager.getId();
087  }
088
089  @Override
090  public void close() {
091    this.evaluatorManager.close();
092  }
093
094  @Override
095  public void submitTask(final Configuration taskConfiguration) {
096    final Configuration contextConfiguration = ContextConfiguration.CONF
097        .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId())
098        .build();
099    this.submitContextAndTask(contextConfiguration, taskConfiguration);
100  }
101
102  /**
103   * Submit Task with configuration strings.
104   * This method should be called from bridge and the configuration strings are
105   * serialized at .Net side.
106   * @param taskConfiguration
107   */
108  public void submitTask(final String taskConfiguration) {
109    final Configuration contextConfiguration = ContextConfiguration.CONF
110        .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId())
111        .build();
112    final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration);
113    this.launchWithConfigurationString(
114        contextConfigurationString, Optional.<String>empty(), Optional.of(taskConfiguration));
115  }
116
117  @Override
118  public EvaluatorDescriptor getEvaluatorDescriptor() {
119    return this.evaluatorManager.getEvaluatorDescriptor();
120  }
121
122  @Override
123  public void submitContext(final Configuration contextConfiguration) {
124    launch(contextConfiguration, Optional.<Configuration>empty(), Optional.<Configuration>empty());
125  }
126
127  /**
128   * Submit Context with configuration strings.
129   * This method should be called from bridge and the configuration strings are
130   * serialized at .Net side.
131   * @param contextConfiguration
132   */
133  public void submitContext(final String contextConfiguration) {
134    launchWithConfigurationString(contextConfiguration, Optional.<String>empty(), Optional.<String>empty());
135  }
136
137  @Override
138  public void submitContextAndService(final Configuration contextConfiguration,
139                                      final Configuration serviceConfiguration) {
140    launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.<Configuration>empty());
141  }
142
143  /**
144   * Submit Context and Service with configuration strings.
145   * This method should be called from bridge and the configuration strings are
146   * serialized at .Net side.
147   * @param contextConfiguration
148   * @param serviceConfiguration
149   */
150  public void submitContextAndService(final String contextConfiguration,
151                                      final String serviceConfiguration) {
152    launchWithConfigurationString(contextConfiguration, Optional.of(serviceConfiguration), Optional.<String>empty());
153  }
154
155  @Override
156  public void submitContextAndTask(final Configuration contextConfiguration,
157                                   final Configuration taskConfiguration) {
158    launch(contextConfiguration, Optional.<Configuration>empty(), Optional.of(taskConfiguration));
159  }
160
161  /**
162   * Submit Context and Task with configuration strings.
163   * This method should be called from bridge and the configuration strings are
164   * serialized at .Net side.
165   * @param contextConfiguration
166   * @param taskConfiguration
167   */
168  public void submitContextAndTask(final String contextConfiguration,
169                                   final String taskConfiguration) {
170    this.launchWithConfigurationString(contextConfiguration, Optional.<String>empty(), Optional.of(taskConfiguration));
171  }
172
173  @Override
174  public void submitContextAndServiceAndTask(final Configuration contextConfiguration,
175                                             final Configuration serviceConfiguration,
176                                             final Configuration taskConfiguration) {
177    launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration));
178  }
179
180  /**
181   * Submit Context and Service with configuration strings.
182   * This method should be called from bridge and the configuration strings are
183   * serialized at .Net side
184   * @param contextConfiguration
185   * @param serviceConfiguration
186   * @param taskConfiguration
187   */
188  public void submitContextAndServiceAndTask(final String contextConfiguration,
189                                             final String serviceConfiguration,
190                                             final String taskConfiguration) {
191    launchWithConfigurationString(
192        contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration));
193  }
194
195  @Override
196  public void setProcess(final EvaluatorProcess process) {
197    this.evaluatorManager.setProcess(process);
198  }
199
200  @Override
201  public void addFile(final File file) {
202    this.files.add(file);
203  }
204
205  @Override
206  public void addLibrary(final File file) {
207    this.libraries.add(file);
208  }
209
210  private void launch(final Configuration contextConfiguration,
211                      final Optional<Configuration> serviceConfiguration,
212                      final Optional<Configuration> taskConfiguration) {
213    try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) {
214      final Configuration evaluatorConfiguration =
215          makeEvaluatorConfiguration(contextConfiguration, serviceConfiguration, taskConfiguration);
216
217      resourceBuildAndLaunch(evaluatorConfiguration);
218    }
219  }
220
221  /**
222   * Submit Context, Service and Task with configuration strings.
223   * This method should be called from bridge and the configuration strings are
224   * serialized at .Net side
225   * @param contextConfiguration
226   * @param serviceConfiguration
227   * @param taskConfiguration
228   */
229  private void launchWithConfigurationString(final String contextConfiguration,
230                                    final Optional<String> serviceConfiguration,
231                                    final Optional<String> taskConfiguration) {
232    try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) {
233      final Configuration evaluatorConfiguration =
234          makeEvaluatorConfiguration(contextConfiguration, serviceConfiguration, taskConfiguration);
235
236      resourceBuildAndLaunch(evaluatorConfiguration);
237    }
238  }
239
240  private void resourceBuildAndLaunch(final Configuration evaluatorConfiguration) {
241    final ResourceLaunchEventImpl.Builder rbuilder =
242        ResourceLaunchEventImpl.newBuilder()
243            .setIdentifier(this.evaluatorManager.getId())
244            .setRemoteId(this.remoteID)
245            .setEvaluatorConf(evaluatorConfiguration)
246            .addFiles(this.files)
247            .addLibraries(this.libraries)
248            .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName());
249
250    rbuilder.setProcess(this.evaluatorManager.getEvaluatorDescriptor().getProcess());
251    this.evaluatorManager.onResourceLaunch(rbuilder.build());
252  }
253
254  /**
255   * Make configuration for evaluator.
256   * @param contextConfiguration
257   * @param serviceConfiguration
258   * @param taskConfiguration
259   * @return Configuration
260   */
261  private Configuration makeEvaluatorConfiguration(final Configuration contextConfiguration,
262                                                   final Optional<Configuration> serviceConfiguration,
263                                                   final Optional<Configuration> taskConfiguration) {
264
265    final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration);
266
267    final Optional<String> taskConfigurationString;
268    if (taskConfiguration.isPresent()) {
269      taskConfigurationString = Optional.of(this.configurationSerializer.toString(taskConfiguration.get()));
270    } else {
271      taskConfigurationString = Optional.<String>empty();
272    }
273
274    final Optional<Configuration> mergedServiceConfiguration = makeRootServiceConfiguration(serviceConfiguration);
275    if (mergedServiceConfiguration.isPresent()) {
276      final String serviceConfigurationString = this.configurationSerializer.toString(mergedServiceConfiguration.get());
277      return makeEvaluatorConfiguration(
278          contextConfigurationString, Optional.of(serviceConfigurationString), taskConfigurationString);
279    } else {
280      return makeEvaluatorConfiguration(contextConfigurationString,  Optional.<String>empty(), taskConfigurationString);
281    }
282  }
283
284  /**
285   * Make configuration for Evaluator.
286   * @param contextConfiguration
287   * @param serviceConfiguration
288   * @param taskConfiguration
289   * @return Configuration
290   */
291  private Configuration makeEvaluatorConfiguration(final String contextConfiguration,
292                                                   final Optional<String> serviceConfiguration,
293                                                   final Optional<String> taskConfiguration) {
294
295    final ConfigurationModule evaluatorConfigModule;
296    if (this.evaluatorManager.getEvaluatorDescriptor().getProcess() instanceof CLRProcess) {
297      evaluatorConfigModule = EvaluatorConfiguration.CONFCLR;
298    } else {
299      evaluatorConfigModule = EvaluatorConfiguration.CONF;
300    }
301    ConfigurationModule evaluatorConfigurationModule = evaluatorConfigModule
302        .set(EvaluatorConfiguration.APPLICATION_IDENTIFIER, this.jobIdentifier)
303        .set(EvaluatorConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteID)
304        .set(EvaluatorConfiguration.EVALUATOR_IDENTIFIER, this.getId())
305        .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, contextConfiguration);
306
307    // Add the (optional) service configuration
308    if (serviceConfiguration.isPresent()) {
309      evaluatorConfigurationModule = evaluatorConfigurationModule
310          .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, serviceConfiguration.get());
311    }
312
313    // Add the (optional) task configuration
314    if (taskConfiguration.isPresent()) {
315      evaluatorConfigurationModule = evaluatorConfigurationModule
316          .set(EvaluatorConfiguration.TASK_CONFIGURATION, taskConfiguration.get());
317    }
318
319    // Create the evaluator configuration.
320    return evaluatorConfigurationModule.build();
321  }
322
323  /**
324   * Merges the Configurations provided by the evaluatorConfigurationProviders into the given
325   * serviceConfiguration, if any.
326   */
327  private Optional<Configuration> makeRootServiceConfiguration(final Optional<Configuration> serviceConfiguration) {
328    final EvaluatorType evaluatorType = this.evaluatorManager.getEvaluatorDescriptor().getProcess().getType();
329    if (EvaluatorType.CLR == evaluatorType) {
330      LOG.log(Level.FINE, "Not using the ConfigurationProviders as we are configuring a {0} Evaluator.", evaluatorType);
331      return serviceConfiguration;
332    }
333
334    if (!serviceConfiguration.isPresent() && this.evaluatorConfigurationProviders.isEmpty()) {
335      // No configurations to merge.
336      LOG.info("No service configuration given and no ConfigurationProviders set.");
337      return Optional.empty();
338    } else {
339      final ConfigurationBuilder configurationBuilder = getConfigurationBuilder(serviceConfiguration);
340      for (final ConfigurationProvider configurationProvider : this.evaluatorConfigurationProviders) {
341        configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
342      }
343      return Optional.of(configurationBuilder.build());
344    }
345  }
346
347  /**
348   * Utility to build a ConfigurationBuilder from an Optional<Configuration>.
349   */
350  private static ConfigurationBuilder getConfigurationBuilder(final Optional<Configuration> configuration) {
351    if (configuration.isPresent()) {
352      return Tang.Factory.getTang().newConfigurationBuilder(configuration.get());
353    } else {
354      return Tang.Factory.getTang().newConfigurationBuilder();
355    }
356  }
357
358  @Override
359  public String toString() {
360    return "AllocatedEvaluator{ID='" + getId() + "\'}";
361  }
362}