This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.local.process;
020
021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
022import org.apache.reef.util.OSUtils;
023
024import java.io.*;
025import java.nio.charset.StandardCharsets;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.locks.Condition;
031import java.util.concurrent.locks.Lock;
032import java.util.concurrent.locks.ReentrantLock;
033import java.util.logging.Level;
034import java.util.logging.Logger;
035import java.util.regex.Matcher;
036import java.util.regex.Pattern;
037
038/**
039 * A runnable class that encapsulates a process.
040 */
041public final class RunnableProcess implements Runnable {
042
043  private static final Logger LOG = Logger.getLogger(RunnableProcess.class.getName());
044
045  private static final long DESTROY_WAIT_TIME = 100;
046
047  /**
048   * Name of the file used for STDERR redirection.
049   */
050  private final String standardErrorFileName;
051
052  /**
053   * Name of the file used for STDOUT redirection.
054   */
055  private final String standardOutFileName;
056
057  /**
058   * Command to execute.
059   */
060  private final List<String> command;
061
062  /**
063   * User supplied ID of this process.
064   */
065  private final String id;
066
067  /**
068   * The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited.
069   */
070  private final File folder;
071
072  /**
073   * The coarse-grained lock for state transition.
074   */
075  private final Lock stateLock = new ReentrantLock();
076
077  private final Condition doneCond = stateLock.newCondition();
078
079  /**
080   * This will be informed of process start and stop.
081   */
082  private final RunnableProcessObserver processObserver;
083
084  /**
085   * The process.
086   */
087  private Process process;
088
089  /**
090   * The state of the process.
091   */
092  private RunnableProcessState state = RunnableProcessState.INIT;   // synchronized on stateLock
093
094  /**
095   * @param command the command to execute.
096   * @param id The ID of the process. This is used to name files and in the logs created by this process.
097   * @param folder The folder in which this will store its stdout and stderr output
098   * @param processObserver will be informed of process state changes.
099   * @param standardOutFileName The name of the file used for redirecting STDOUT
100   * @param standardErrorFileName The name of the file used for redirecting STDERR
101   */
102  public RunnableProcess(
103      final List<String> command,
104      final String id,
105      final File folder,
106      final RunnableProcessObserver processObserver,
107      final String standardOutFileName,
108      final String standardErrorFileName) {
109
110    this.processObserver = processObserver;
111    this.command = Collections.unmodifiableList(expandEnvironmentVariables(command));
112    this.id = id;
113    this.folder = folder;
114
115    assert this.folder.isDirectory();
116    if (!this.folder.exists() && !this.folder.mkdirs()) {
117      LOG.log(Level.WARNING, "Failed to create [{0}]", this.folder.getAbsolutePath());
118    }
119
120    this.standardOutFileName = standardOutFileName;
121    this.standardErrorFileName = standardErrorFileName;
122
123    LOG.log(Level.FINEST, "RunnableProcess ready.");
124  }
125
126  private static final Pattern ENV_REGEX = Pattern.compile("\\{\\{(\\w+)}}");
127
128  /**
129   * Replace {{ENV_VAR}} placeholders with the values of the corresponding environment variables.
130   * @param command An input string with {{ENV_VAR}} placeholders
131   * to be replaced with the values of the corresponding environment variables.
132   * Replace unknown/unset variables with an empty string.
133   * @return A new string with all the placeholders expanded.
134   */
135  public static String expandEnvironmentVariables(final String command) {
136
137    final Matcher match = ENV_REGEX.matcher(command);
138    final StringBuilder res = new StringBuilder(command.length());
139
140    int i = 0;
141    while (match.find()) {
142      final String var = System.getenv(match.group(1));
143      res.append(command.substring(i, match.start())).append(var == null ? "" : var);
144      i = match.end();
145    }
146
147    return res.append(command.substring(i, command.length())).toString();
148  }
149
150  /**
151   * Replace {{ENV_VAR}} placeholders with the values of the corresponding environment variables.
152   * @param command An input list of strings with {{ENV_VAR}} placeholders
153   * to be replaced with the values of the corresponding environment variables.
154   * Replace unknown/unset variables with an empty string.
155   * @return A new list of strings with all the placeholders expanded.
156   */
157  public static List<String> expandEnvironmentVariables(final List<String> command) {
158    final ArrayList<String> res = new ArrayList<>(command.size());
159    for (final String cmd : command) {
160      res.add(expandEnvironmentVariables(cmd));
161    }
162    return res;
163  }
164
165  /**
166   * Runs the configured process.
167   * @throws IllegalStateException if the process is already running or has been running before.
168   */
169  @Override
170  public void run() {
171
172    this.stateLock.lock();
173
174    try {
175
176      if (this.state != RunnableProcessState.INIT) {
177        throw new IllegalStateException("The RunnableProcess can't be reused");
178      }
179
180      // Setup the stdout and stderr destinations.
181      final File errFile = new File(folder, standardErrorFileName);
182      final File outFile = new File(folder, standardOutFileName);
183
184      // Launch the process
185      try {
186
187        LOG.log(Level.FINEST,
188            "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}",
189            new Object[] {this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()});
190
191        this.process = new ProcessBuilder()
192            .command(this.command)
193            .directory(this.folder)
194            .redirectError(errFile)
195            .redirectOutput(outFile)
196            .start();
197
198        this.setState(RunnableProcessState.RUNNING);
199        this.processObserver.onProcessStarted(this.id);
200
201      } catch (final IOException ex) {
202        LOG.log(Level.SEVERE, "Unable to spawn process " + this.id + " with command " + this.command, ex);
203      }
204
205    } finally {
206      this.stateLock.unlock();
207    }
208
209    try {
210
211      // Wait for its completion
212      LOG.log(Level.FINER, "Wait for process completion: {0}", this.id);
213      final int returnValue = this.process.waitFor();
214      this.processObserver.onProcessExit(this.id, returnValue);
215
216      this.stateLock.lock();
217      try {
218        this.setState(RunnableProcessState.ENDED);
219        this.doneCond.signalAll();
220      } finally {
221        this.stateLock.unlock();
222      }
223
224      LOG.log(Level.FINER, "Process \"{0}\" returned {1}", new Object[] {this.id, returnValue});
225
226    } catch (final InterruptedException ex) {
227      LOG.log(Level.SEVERE,
228          "Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}",
229          new Object[] {this.id, ex});
230    }
231  }
232
233  /**
234   * Cancels the running process if it is running.
235   */
236  public void cancel() {
237
238    this.stateLock.lock();
239
240    try {
241
242      if (this.state == RunnableProcessState.RUNNING) {
243        this.process.destroy();
244        if (!this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS)) {
245          LOG.log(Level.FINE, "{0} milliseconds elapsed", DESTROY_WAIT_TIME);
246        }
247      }
248
249      if (this.state == RunnableProcessState.RUNNING) {
250        LOG.log(Level.WARNING, "The child process survived Process.destroy()");
251        if (OSUtils.isUnix() || OSUtils.isWindows()) {
252          LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line");
253          try {
254            final long pid = readPID();
255            OSUtils.kill(pid);
256          } catch (final IOException | InterruptedException | NumberFormatException e) {
257            LOG.log(Level.SEVERE, "Unable to kill the process.", e);
258          }
259        }
260      }
261
262    } catch (final InterruptedException ex) {
263      LOG.log(Level.SEVERE,
264          "Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}",
265          new Object[] {this.id, ex});
266    } finally {
267      this.stateLock.unlock();
268    }
269  }
270
271  /**
272   * @return the PID stored in the PID file.
273   * @throws IOException if the file can't be read.
274   */
275  private long readPID() throws IOException {
276    final String pidFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME;
277    try (final BufferedReader r = new BufferedReader(
278        new InputStreamReader(new FileInputStream(pidFileName), StandardCharsets.UTF_8))) {
279      return Long.parseLong(r.readLine());
280    }
281  }
282
283  /**
284   * @return the ID of the process.
285   */
286  public String getId() {
287    return this.id;
288  }
289
290  /**
291   * @return the command given to the process.
292   */
293  public List<String> getCommand() {
294    return this.command;
295  }
296
297  /**
298   * Sets a new state for the process.
299   * @param newState a new process state to transition to.
300   * @throws IllegalStateException if the new state is illegal.
301   */
302  private void setState(final RunnableProcessState newState) {
303    if (!this.state.isLegal(newState)) {
304      throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal");
305    }
306    this.state = newState;
307  }
308}