This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.local.client;
020
021import org.apache.reef.annotations.audience.ClientSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
024import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
025import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
026import org.apache.reef.runtime.common.files.REEFFileNames;
027import org.apache.reef.runtime.local.client.parameters.RootFolder;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.annotations.Parameter;
030import org.apache.reef.tang.formats.ConfigurationSerializer;
031import org.apache.reef.util.logging.LoggingScope;
032import org.apache.reef.util.logging.LoggingScopeFactory;
033
034import javax.inject.Inject;
035import java.io.File;
036import java.util.concurrent.ExecutorService;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Handles Job Submissions for the Local and the Standalone Runtime.
042 */
043@Private
044@ClientSide
045public final class LocalJobSubmissionHandler implements JobSubmissionHandler {
046
047
048  private static final Logger LOG = Logger.getLogger(LocalJobSubmissionHandler.class.getName());
049  private final ExecutorService executor;
050  private final String rootFolderName;
051  private final ConfigurationSerializer configurationSerializer;
052  private final REEFFileNames fileNames;
053  private final PreparedDriverFolderLauncher driverLauncher;
054  private final LoggingScopeFactory loggingScopeFactory;
055  private final DriverConfigurationProvider driverConfigurationProvider;
056
057  private String applicationId;
058
059  @Inject
060  LocalJobSubmissionHandler(
061      final ExecutorService executor,
062      @Parameter(RootFolder.class) final String rootFolderName,
063      final ConfigurationSerializer configurationSerializer,
064      final REEFFileNames fileNames,
065
066      final PreparedDriverFolderLauncher driverLauncher,
067      final LoggingScopeFactory loggingScopeFactory,
068      final DriverConfigurationProvider driverConfigurationProvider) {
069
070    this.executor = executor;
071    this.configurationSerializer = configurationSerializer;
072    this.fileNames = fileNames;
073
074    this.driverLauncher = driverLauncher;
075    this.driverConfigurationProvider = driverConfigurationProvider;
076    this.rootFolderName = new File(rootFolderName).getAbsolutePath();
077    this.loggingScopeFactory = loggingScopeFactory;
078
079    LOG.log(Level.FINE, "Instantiated 'LocalJobSubmissionHandler'");
080  }
081
082  @Override
083  public void close() {
084    this.executor.shutdown();
085  }
086
087  @Override
088  public void onNext(final JobSubmissionEvent t) {
089    try (final LoggingScope lf = loggingScopeFactory.localJobSubmission()) {
090      try {
091        LOG.log(Level.FINEST, "Starting local job {0}", t.getIdentifier());
092
093        final File jobFolder = new File(new File(rootFolderName),
094            "/" + t.getIdentifier() + "-" + System.currentTimeMillis() + "/");
095
096        final File driverFolder = new File(jobFolder, PreparedDriverFolderLauncher.DRIVER_FOLDER_NAME);
097        if (!driverFolder.exists() && !driverFolder.mkdirs()) {
098          LOG.log(Level.WARNING, "Failed to create [{0}]", driverFolder.getAbsolutePath());
099        }
100
101        final DriverFiles driverFiles = DriverFiles.fromJobSubmission(t, this.fileNames);
102        driverFiles.copyTo(driverFolder);
103
104        final Configuration driverConfiguration = this.driverConfigurationProvider
105            .getDriverConfiguration(jobFolder.toURI(),
106                                    t.getRemoteId(),
107                                    t.getIdentifier(),
108                                    t.getConfiguration());
109
110        this.configurationSerializer.toFile(driverConfiguration,
111            new File(driverFolder, this.fileNames.getDriverConfigurationPath()));
112        this.driverLauncher.launch(driverFolder);
113        this.applicationId = t.getIdentifier();
114
115      } catch (final Exception e) {
116        LOG.log(Level.SEVERE, "Unable to setup driver.", e);
117        throw new RuntimeException("Unable to setup driver.", e);
118      }
119    }
120  }
121
122  /**
123   * Get the RM application ID.
124   * Return null if the application has not been submitted yet, or was submitted unsuccessfully.
125   * @return string application ID or null if no app has been submitted yet.
126   */
127  @Override
128  public String getApplicationId() {
129    return this.applicationId;
130  }
131}