This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.files;
020
021import org.apache.reef.annotations.audience.ClientSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.annotations.audience.RuntimeAuthor;
024import org.apache.reef.proto.ClientRuntimeProtocol;
025import org.apache.reef.proto.ReefServiceProtos;
026import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
027import org.apache.reef.tang.Configuration;
028import org.apache.reef.tang.annotations.Parameter;
029import org.apache.reef.tang.formats.ConfigurationSerializer;
030import org.apache.reef.util.JARFileMaker;
031
032import javax.inject.Inject;
033import java.io.File;
034import java.io.IOException;
035import java.nio.file.Files;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039/**
040 * Utility that takes a JobSubmissionProto and turns it into a Job Submission Jar.
041 */
042@Private
043@RuntimeAuthor
044@ClientSide
045public final class JobJarMaker {
046
047  private static final Logger LOG = Logger.getLogger(JobJarMaker.class.getName());
048
049  private final ConfigurationSerializer configurationSerializer;
050  private final REEFFileNames fileNames;
051  private final boolean deleteTempFilesOnExit;
052
053  @Inject
054  JobJarMaker(final ConfigurationSerializer configurationSerializer,
055              final REEFFileNames fileNames,
056              final @Parameter(DeleteTempFiles.class) boolean deleteTempFilesOnExit) {
057    this.configurationSerializer = configurationSerializer;
058    this.fileNames = fileNames;
059    this.deleteTempFilesOnExit = deleteTempFilesOnExit;
060  }
061
062  public static void copy(final Iterable<ReefServiceProtos.FileResourceProto> files, final File destinationFolder) {
063
064    if (!destinationFolder.exists()) {
065      destinationFolder.mkdirs();
066    }
067
068    for (final ReefServiceProtos.FileResourceProto fileProto : files) {
069      final File sourceFile = toFile(fileProto);
070      final File destinationFile = new File(destinationFolder, fileProto.getName());
071      if (destinationFile.exists()) {
072        LOG.log(Level.FINEST,
073            "Will not add {0} to the job jar because another file with the same name was already added.",
074            sourceFile.getAbsolutePath()
075        );
076      } else {
077        try {
078          java.nio.file.Files.copy(sourceFile.toPath(), destinationFile.toPath());
079        } catch (final IOException e) {
080          final String message = new StringBuilder("Copy of file [")
081              .append(sourceFile.getAbsolutePath())
082              .append("] to [")
083              .append(destinationFile.getAbsolutePath())
084              .append("] failed.")
085              .toString();
086          throw new RuntimeException(message, e);
087        }
088      }
089    }
090  }
091
092  private static File toFile(final ReefServiceProtos.FileResourceProto fileProto) {
093    return new File(fileProto.getPath());
094  }
095
096  public File createJobSubmissionJAR(
097      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
098      final Configuration driverConfiguration) throws IOException {
099
100    // Copy all files to a local job submission folder
101    final File jobSubmissionFolder = makejobSubmissionFolder();
102    LOG.log(Level.FINE, "Staging submission in {0}", jobSubmissionFolder);
103
104    final File localFolder = new File(jobSubmissionFolder, this.fileNames.getLocalFolderName());
105    final File globalFolder = new File(jobSubmissionFolder, this.fileNames.getGlobalFolderName());
106
107    this.copy(jobSubmissionProto.getGlobalFileList(), globalFolder);
108    this.copy(jobSubmissionProto.getLocalFileList(), localFolder);
109
110    // Store the Driver Configuration in the JAR file.
111    this.configurationSerializer.toFile(
112        driverConfiguration, new File(localFolder, this.fileNames.getDriverConfigurationName()));
113
114    // Create a JAR File for the submission
115    final File jarFile = File.createTempFile(this.fileNames.getJobFolderPrefix(), this.fileNames.getJarFileSuffix());
116
117    LOG.log(Level.FINE, "Creating job submission jar file: {0}", jarFile);
118    new JARFileMaker(jarFile).addChildren(jobSubmissionFolder).close();
119
120    if (this.deleteTempFilesOnExit) {
121      LOG.log(Level.FINE,
122          "Deleting the temporary job folder [{0}] and marking the jar file [{1}] for deletion after the JVM exits.",
123          new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
124      jobSubmissionFolder.delete();
125      jarFile.deleteOnExit();
126    } else {
127      LOG.log(Level.FINE, "Keeping the temporary job folder [{0}] and jar file [{1}] available after job submission.",
128          new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
129    }
130    return jarFile;
131  }
132
133  private File makejobSubmissionFolder() throws IOException {
134    return Files.createTempDirectory(this.fileNames.getJobFolderPrefix()).toFile();
135  }
136}