This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.local.driver;
020
021import org.apache.reef.annotations.audience.Private;
022import org.apache.reef.annotations.audience.TaskSide;
023import org.apache.reef.runtime.common.files.REEFFileNames;
024import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver;
025import org.apache.reef.runtime.local.process.RunnableProcess;
026import org.apache.reef.runtime.local.process.RunnableProcessObserver;
027
028import java.io.File;
029import java.io.IOException;
030import java.nio.file.Files;
031import java.nio.file.Path;
032import java.util.Arrays;
033import java.util.List;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037/**
038 * A Container that runs an Evaluator in a Process.
039 */
040@Private
041@TaskSide
042public final class ProcessContainer implements Container {
043
044  private static final Logger LOG = Logger.getLogger(ProcessContainer.class.getName());
045
046  private final String errorHandlerRID;
047  private final String nodeID;
048  private final File folder;
049  private final String containedID;
050  private final int megaBytes;
051  private final int numberOfCores;
052  private final String rackName;
053  private final REEFFileNames fileNames;
054  private final File localFolder;
055  private final File globalFolder;
056  private final RunnableProcessObserver processObserver;
057  private final ThreadGroup threadGroup;
058
059  private Thread theThread;
060  private RunnableProcess process;
061
062  /**
063   * @param errorHandlerRID the remoteID of the error handler.
064   * @param nodeID          the ID of the (fake) node this Container is instantiated on
065   * @param containedID     the  ID used to identify this container uniquely
066   * @param folder          the folder in which logs etc. will be deposited
067   */
068  ProcessContainer(final String errorHandlerRID,
069                   final String nodeID,
070                   final String containedID,
071                   final File folder,
072                   final int megaBytes,
073                   final int numberOfCores,
074                   final String rackName,
075                   final REEFFileNames fileNames,
076                   final ReefRunnableProcessObserver processObserver,
077                   final ThreadGroup threadGroup) {
078
079    this.errorHandlerRID = errorHandlerRID;
080    this.nodeID = nodeID;
081    this.containedID = containedID;
082    this.folder = folder;
083    this.megaBytes = megaBytes;
084    this.numberOfCores = numberOfCores;
085    this.rackName = rackName;
086    this.fileNames = fileNames;
087    this.processObserver = processObserver;
088    this.threadGroup = threadGroup;
089
090    final File reefFolder = new File(folder, fileNames.getREEFFolderName());
091
092    this.localFolder = new File(reefFolder, fileNames.getLocalFolderName());
093    if (!this.localFolder.exists() && !this.localFolder.mkdirs()) {
094      LOG.log(Level.WARNING, "Failed to create [{0}]", this.localFolder.getAbsolutePath());
095    }
096
097    this.globalFolder = new File(reefFolder, fileNames.getGlobalFolderName());
098    if (!this.globalFolder.exists() && !this.globalFolder.mkdirs()) {
099      LOG.log(Level.WARNING, "Failed to create [{0}]", this.globalFolder.getAbsolutePath());
100    }
101  }
102
103  private static void copy(final Iterable<File> files, final File folder) throws IOException {
104    for (final File sourceFile : files) {
105      final File destinationFile = new File(folder, sourceFile.getName());
106      if (Files.isSymbolicLink(sourceFile.toPath())) {
107        final Path linkTargetPath = Files.readSymbolicLink(sourceFile.toPath());
108        Files.createSymbolicLink(destinationFile.toPath(), linkTargetPath);
109      } else {
110        Files.copy(sourceFile.toPath(), destinationFile.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING);
111      }
112    }
113  }
114
115  @Override
116  public void addLocalFiles(final Iterable<File> files) {
117    try {
118      copy(files, this.localFolder);
119    } catch (final IOException e) {
120      throw new RuntimeException("Unable to copy files to the evaluator folder.", e);
121    }
122  }
123
124  @Override
125  public void addGlobalFiles(final File globalFilesFolder) {
126    try {
127      final File[] files = globalFilesFolder.listFiles();
128      if (files != null) {
129        copy(Arrays.asList(files), this.globalFolder);
130      }
131    } catch (final IOException e) {
132      throw new RuntimeException("Unable to copy files to the evaluator folder.", e);
133    }
134  }
135
136  @Override
137  public void run(final List<String> commandLine) {
138
139    this.process = new RunnableProcess(
140        commandLine,
141        this.containedID,
142        this.folder,
143        this.processObserver,
144        this.fileNames.getEvaluatorStdoutFileName(),
145        this.fileNames.getEvaluatorStderrFileName());
146
147    this.theThread = new Thread(this.threadGroup, this.process, "ProcessContainer:" + this.containedID);
148    this.theThread.start();
149  }
150
151  @Override
152  public boolean isRunning() {
153    return null != this.theThread && this.theThread.isAlive();
154  }
155
156  @Override
157  public int getMemory() {
158    return this.megaBytes;
159  }
160
161  @Override
162  public int getNumberOfCores() {
163    return this.numberOfCores;
164  }
165
166  @Override
167  public String getRackName() {
168    return this.rackName;
169  }
170
171  @Override
172  public File getFolder() {
173    return this.folder;
174  }
175
176  @Override
177  public String getNodeID() {
178    return this.nodeID;
179  }
180
181  @Override
182  public String getContainerID() {
183    return this.containedID;
184  }
185
186  @Override
187  public void close() {
188    if (isRunning()) {
189      LOG.log(Level.WARNING, "Force-closing a container that is still running: {0}", this);
190      this.process.cancel();
191    }
192  }
193
194  @Override
195  public String toString() {
196    return String.format(
197        "ProcessContainer{containedID=%s, nodeID=%s, errorHandlerRID=%s, folder=%s, rack=%s}",
198        this.containedID, this.nodeID, this.errorHandlerRID, this.folder, this.rackName);
199  }
200}