This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.local.process;
020
021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
022import org.apache.reef.util.OSUtils;
023
024import java.io.BufferedReader;
025import java.io.File;
026import java.io.FileReader;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.locks.Condition;
032import java.util.concurrent.locks.Lock;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037
038/**
039 * A runnable class that encapsulates a process.
040 */
041public final class RunnableProcess implements Runnable {
042
043  private static final Logger LOG = Logger.getLogger(RunnableProcess.class.getName());
044
045  private static final long DESTROY_WAIT_TIME = 100;
046
047  /**
048   * Name of the file used for STDERR redirection.
049   */
050  private final String standardErrorFileName;
051  /**
052   * Name of the file used for STDOUT redirection.
053   */
054  private final String standardOutFileName;
055
056  /**
057   * Command to execute.
058   */
059  private final List<String> command;
060  /**
061   * User supplied ID of this process.
062   */
063  private final String id;
064  /**
065   * The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited.
066   */
067  private final File folder;
068  /**
069   * The coarse-grained lock for state transition.
070   */
071  private final Lock stateLock = new ReentrantLock();
072  private final Condition doneCond = stateLock.newCondition();
073  /**
074   * This will be informed of process start and stop.
075   */
076  private final RunnableProcessObserver processObserver;
077  /**
078   * The process.
079   */
080  private Process process;
081  /**
082   * The state of the process.
083   */
084  private State state = State.INIT;   // synchronized on stateLock
085
086  /**
087   * @param command               the command to execute.
088   * @param id                    The ID of the process. This is used to name files and in the logs created by this process.
089   * @param folder                The folder in which this will store its stdout and stderr output
090   * @param processObserver       will be informed of process state changes.
091   * @param standardOutFileName   The name of the file used for redirecting STDOUT
092   * @param standardErrorFileName The name of the file used for redirecting STDERR
093   */
094  public RunnableProcess(final List<String> command,
095                         final String id,
096                         final File folder,
097                         final RunnableProcessObserver processObserver,
098                         final String standardOutFileName,
099                         final String standardErrorFileName) {
100    this.processObserver = processObserver;
101    this.command = new ArrayList<>(command);
102    this.id = id;
103    this.folder = folder;
104    assert (this.folder.isDirectory());
105    this.folder.mkdirs();
106    this.standardOutFileName = standardOutFileName;
107    this.standardErrorFileName = standardErrorFileName;
108    LOG.log(Level.FINEST, "RunnableProcess ready.");
109  }
110
111  /**
112   * Checks whether a transition from State 'from' to state 'to' is legal
113   *
114   * @param from
115   * @param to
116   * @return true, if the state transition is legal. False otherwise.
117   */
118  private static boolean isLegal(final State from, final State to) {
119    switch (from) {
120      case INIT:
121        switch (to) {
122          case INIT:
123          case RUNNING:
124          case ENDED:
125            return true;
126          default:
127            return false;
128        }
129      case RUNNING:
130        switch (to) {
131          case ENDED:
132            return true;
133          default:
134            return false;
135        }
136      case ENDED:
137        return false;
138      default:
139        return false;
140    }
141  }
142
143  /**
144   * Runs the configured process.
145   *
146   * @throws java.lang.IllegalStateException if the process is already running or has been running before.
147   */
148  @Override
149  public void run() {
150    this.stateLock.lock();
151    try {
152      if (this.getState() != State.INIT) {
153        throw new IllegalStateException("The RunnableProcess can't be reused");
154      }
155
156      // Setup the stdout and stderr destinations.
157      final File errFile = new File(folder, standardErrorFileName);
158      final File outFile = new File(folder, standardOutFileName);
159
160      // Launch the process
161      try {
162        LOG.log(Level.FINEST, "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}",
163            new Object[]{this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()});
164        this.process = new ProcessBuilder()
165            .command(this.command)
166            .directory(this.folder)
167            .redirectError(errFile)
168            .redirectOutput(outFile)
169            .start();
170        this.setState(State.RUNNING);
171        this.processObserver.onProcessStarted(this.id);
172      } catch (final IOException ex) {
173        LOG.log(Level.SEVERE, "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}",
174            new Object[]{this.id, this.command, ex});
175      }
176    } finally {
177      this.stateLock.unlock();
178    }
179
180    try {
181      // Wait for its completion
182      final int returnValue = process.waitFor();
183      this.processObserver.onProcessExit(this.id, returnValue);
184      this.stateLock.lock();
185      try {
186        this.setState(State.ENDED);
187        this.doneCond.signalAll();
188      } finally {
189        this.stateLock.unlock();
190      }
191      LOG.log(Level.FINEST, "Process \"{0}\" returned {1}", new Object[]{this.id, returnValue});
192    } catch (final InterruptedException ex) {
193      LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
194          new Object[]{this.id, ex});
195    }
196  }
197
198
199  /**
200   * Cancels the running process if it is running.
201   */
202  public void cancel() {
203    this.stateLock.lock();
204    try {
205      if (this.processIsRunning()) {
206        this.process.destroy();
207        this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS);
208      }
209
210      if (this.processIsRunning()) {
211        LOG.log(Level.WARNING, "The child process survived Process.destroy()");
212        if (OSUtils.isLinux()) {
213          LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line");
214          try {
215            final long pid = readPID();
216            OSUtils.kill(pid);
217          } catch (final IOException | InterruptedException e) {
218            LOG.log(Level.SEVERE, "Unable to kill the process.", e);
219          }
220        }
221      }
222
223    } catch (final InterruptedException ex) {
224      LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
225          new Object[]{this.id, ex});
226    } finally {
227      this.stateLock.unlock();
228    }
229  }
230
231  /**
232   * @return the PID stored in the PID file.
233   * @throws IOException if the file can't be read.
234   */
235  private long readPID() throws IOException {
236    final String PIDFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME;
237    try (final BufferedReader r = new BufferedReader(new FileReader(PIDFileName))) {
238      return Long.valueOf(r.readLine());
239    }
240  }
241
242  private boolean processIsRunning() {
243    return this.getState() == State.RUNNING;
244  }
245
246  /**
247   * @return the current State of the process.
248   */
249  private State getState() {
250    return this.state;
251  }
252
253  /**
254   * Sets a new state for the process.
255   *
256   * @param newState
257   * @throws java.lang.IllegalStateException if the new state is illegal.
258   */
259  private void setState(final State newState) {
260    if (!isLegal(this.state, newState)) {
261      throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal");
262    }
263    this.state = newState;
264  }
265
266  /**
267   * The possible states of a process: INIT, RUNNING, ENDED.
268   */
269  private enum State {
270    // After initialization
271    INIT,
272    // The process is running
273    RUNNING,
274    // The process ended
275    ENDED
276  }
277}