This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.local.process;
020
021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
022import org.apache.reef.util.OSUtils;
023
024import java.io.*;
025import java.nio.charset.StandardCharsets;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.locks.Condition;
030import java.util.concurrent.locks.Lock;
031import java.util.concurrent.locks.ReentrantLock;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035
036/**
037 * A runnable class that encapsulates a process.
038 */
039public final class RunnableProcess implements Runnable {
040
041  private static final Logger LOG = Logger.getLogger(RunnableProcess.class.getName());
042
043  private static final long DESTROY_WAIT_TIME = 100;
044
045  /**
046   * Name of the file used for STDERR redirection.
047   */
048  private final String standardErrorFileName;
049  /**
050   * Name of the file used for STDOUT redirection.
051   */
052  private final String standardOutFileName;
053
054  /**
055   * Command to execute.
056   */
057  private final List<String> command;
058  /**
059   * User supplied ID of this process.
060   */
061  private final String id;
062  /**
063   * The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited.
064   */
065  private final File folder;
066  /**
067   * The coarse-grained lock for state transition.
068   */
069  private final Lock stateLock = new ReentrantLock();
070  private final Condition doneCond = stateLock.newCondition();
071  /**
072   * This will be informed of process start and stop.
073   */
074  private final RunnableProcessObserver processObserver;
075  /**
076   * The process.
077   */
078  private Process process;
079  /**
080   * The state of the process.
081   */
082  private State state = State.INIT;   // synchronized on stateLock
083
084  /**
085   * @param command               the command to execute.
086   * @param id                    The ID of the process. This is used to name files and in the logs created
087   *                              by this process.
088   * @param folder                The folder in which this will store its stdout and stderr output
089   * @param processObserver       will be informed of process state changes.
090   * @param standardOutFileName   The name of the file used for redirecting STDOUT
091   * @param standardErrorFileName The name of the file used for redirecting STDERR
092   */
093  public RunnableProcess(final List<String> command,
094                         final String id,
095                         final File folder,
096                         final RunnableProcessObserver processObserver,
097                         final String standardOutFileName,
098                         final String standardErrorFileName) {
099    this.processObserver = processObserver;
100    this.command = new ArrayList<>(command);
101    this.id = id;
102    this.folder = folder;
103    assert this.folder.isDirectory();
104    if (!this.folder.exists() && !this.folder.mkdirs()) {
105      LOG.log(Level.WARNING, "Failed to create [{0}]", this.folder.getAbsolutePath());
106    }
107    this.standardOutFileName = standardOutFileName;
108    this.standardErrorFileName = standardErrorFileName;
109    LOG.log(Level.FINEST, "RunnableProcess ready.");
110  }
111
112  /**
113   * Checks whether a transition from State 'from' to state 'to' is legal.
114   *
115   * @param from
116   * @param to
117   * @return true, if the state transition is legal. False otherwise.
118   */
119  private static boolean isLegal(final State from, final State to) {
120    switch (from) {
121    case INIT:
122      switch (to) {
123      case INIT:
124      case RUNNING:
125      case ENDED:
126        return true;
127      default:
128        return false;
129      }
130    case RUNNING:
131      switch (to) {
132      case ENDED:
133        return true;
134      default:
135        return false;
136      }
137    case ENDED:
138      return false;
139    default:
140      return false;
141    }
142  }
143
144  /**
145   * Runs the configured process.
146   *
147   * @throws java.lang.IllegalStateException if the process is already running or has been running before.
148   */
149  @Override
150  public void run() {
151    this.stateLock.lock();
152    try {
153      if (this.getState() != State.INIT) {
154        throw new IllegalStateException("The RunnableProcess can't be reused");
155      }
156
157      // Setup the stdout and stderr destinations.
158      final File errFile = new File(folder, standardErrorFileName);
159      final File outFile = new File(folder, standardOutFileName);
160
161      // Launch the process
162      try {
163        LOG.log(Level.FINEST, "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}",
164            new Object[]{this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()});
165        this.process = new ProcessBuilder()
166            .command(this.command)
167            .directory(this.folder)
168            .redirectError(errFile)
169            .redirectOutput(outFile)
170            .start();
171        this.setState(State.RUNNING);
172        this.processObserver.onProcessStarted(this.id);
173      } catch (final IOException ex) {
174        LOG.log(Level.SEVERE, "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}",
175            new Object[]{this.id, this.command, ex});
176      }
177    } finally {
178      this.stateLock.unlock();
179    }
180
181    try {
182      // Wait for its completion
183      final int returnValue = process.waitFor();
184      this.processObserver.onProcessExit(this.id, returnValue);
185      this.stateLock.lock();
186      try {
187        this.setState(State.ENDED);
188        this.doneCond.signalAll();
189      } finally {
190        this.stateLock.unlock();
191      }
192      LOG.log(Level.FINEST, "Process \"{0}\" returned {1}", new Object[]{this.id, returnValue});
193    } catch (final InterruptedException ex) {
194      LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
195          new Object[]{this.id, ex});
196    }
197  }
198
199
200  /**
201   * Cancels the running process if it is running.
202   */
203  public void cancel() {
204    this.stateLock.lock();
205    try {
206      if (this.processIsRunning()) {
207        this.process.destroy();
208        if (!this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS)) {
209          LOG.log(Level.FINE, "{0} milliseconds elapsed", DESTROY_WAIT_TIME);
210        }
211      }
212
213      if (this.processIsRunning()) {
214        LOG.log(Level.WARNING, "The child process survived Process.destroy()");
215        if (OSUtils.isUnix() || OSUtils.isWindows()) {
216          LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line");
217          try {
218            final long pid = readPID();
219            OSUtils.kill(pid);
220          } catch (final IOException | InterruptedException | NumberFormatException e) {
221            LOG.log(Level.SEVERE, "Unable to kill the process.", e);
222          }
223        }
224      }
225
226    } catch (final InterruptedException ex) {
227      LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
228          new Object[]{this.id, ex});
229    } finally {
230      this.stateLock.unlock();
231    }
232  }
233
234  /**
235   * @return the PID stored in the PID file.
236   * @throws IOException if the file can't be read.
237   */
238  private long readPID() throws IOException {
239    final String pidFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME;
240    try (final BufferedReader r =
241             new BufferedReader(new InputStreamReader(new FileInputStream(pidFileName), StandardCharsets.UTF_8))) {
242      return Long.parseLong(r.readLine());
243    }
244  }
245
246  private boolean processIsRunning() {
247    return this.getState() == State.RUNNING;
248  }
249
250  /**
251   * @return the current State of the process.
252   */
253  private State getState() {
254    return this.state;
255  }
256
257  /**
258   * Sets a new state for the process.
259   *
260   * @param newState
261   * @throws java.lang.IllegalStateException if the new state is illegal.
262   */
263  private void setState(final State newState) {
264    if (!isLegal(this.state, newState)) {
265      throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal");
266    }
267    this.state = newState;
268  }
269
270  /**
271   * The possible states of a process: INIT, RUNNING, ENDED.
272   */
273  private enum State {
274    // After initialization
275    INIT,
276    // The process is running
277    RUNNING,
278    // The process ended
279    ENDED
280  }
281}