001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.local.process; 020 021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; 022import org.apache.reef.util.OSUtils; 023 024import java.io.*; 025import java.nio.charset.StandardCharsets; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.locks.Condition; 031import java.util.concurrent.locks.Lock; 032import java.util.concurrent.locks.ReentrantLock; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035import java.util.regex.Matcher; 036import java.util.regex.Pattern; 037 038/** 039 * A runnable class that encapsulates a process. 040 */ 041public final class RunnableProcess implements Runnable { 042 043 private static final Logger LOG = Logger.getLogger(RunnableProcess.class.getName()); 044 045 private static final long DESTROY_WAIT_TIME = 100; 046 047 /** 048 * Name of the file used for STDERR redirection. 049 */ 050 private final String standardErrorFileName; 051 052 /** 053 * Name of the file used for STDOUT redirection. 054 */ 055 private final String standardOutFileName; 056 057 /** 058 * Command to execute. 059 */ 060 private final List<String> command; 061 062 /** 063 * User supplied ID of this process. 064 */ 065 private final String id; 066 067 /** 068 * The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited. 069 */ 070 private final File folder; 071 072 /** 073 * The coarse-grained lock for state transition. 074 */ 075 private final Lock stateLock = new ReentrantLock(); 076 077 private final Condition doneCond = stateLock.newCondition(); 078 079 /** 080 * This will be informed of process start and stop. 081 */ 082 private final RunnableProcessObserver processObserver; 083 084 /** 085 * The process. 086 */ 087 private Process process; 088 089 /** 090 * The state of the process. 091 */ 092 private RunnableProcessState state = RunnableProcessState.INIT; // synchronized on stateLock 093 094 /** 095 * @param command the command to execute. 096 * @param id The ID of the process. This is used to name files and in the logs created by this process. 097 * @param folder The folder in which this will store its stdout and stderr output 098 * @param processObserver will be informed of process state changes. 099 * @param standardOutFileName The name of the file used for redirecting STDOUT 100 * @param standardErrorFileName The name of the file used for redirecting STDERR 101 */ 102 public RunnableProcess( 103 final List<String> command, 104 final String id, 105 final File folder, 106 final RunnableProcessObserver processObserver, 107 final String standardOutFileName, 108 final String standardErrorFileName) { 109 110 this.processObserver = processObserver; 111 this.command = Collections.unmodifiableList(expandEnvironmentVariables(command)); 112 this.id = id; 113 this.folder = folder; 114 115 assert this.folder.isDirectory(); 116 if (!this.folder.exists() && !this.folder.mkdirs()) { 117 LOG.log(Level.WARNING, "Failed to create [{0}]", this.folder.getAbsolutePath()); 118 } 119 120 this.standardOutFileName = standardOutFileName; 121 this.standardErrorFileName = standardErrorFileName; 122 123 LOG.log(Level.FINEST, "RunnableProcess ready."); 124 } 125 126 private static final Pattern ENV_REGEX = Pattern.compile("\\{\\{(\\w+)}}"); 127 128 /** 129 * Replace {{ENV_VAR}} placeholders with the values of the corresponding environment variables. 130 * @param command An input string with {{ENV_VAR}} placeholders 131 * to be replaced with the values of the corresponding environment variables. 132 * Replace unknown/unset variables with an empty string. 133 * @return A new string with all the placeholders expanded. 134 */ 135 public static String expandEnvironmentVariables(final String command) { 136 137 final Matcher match = ENV_REGEX.matcher(command); 138 final StringBuilder res = new StringBuilder(command.length()); 139 140 int i = 0; 141 while (match.find()) { 142 final String var = System.getenv(match.group(1)); 143 res.append(command.substring(i, match.start())).append(var == null ? "" : var); 144 i = match.end(); 145 } 146 147 return res.append(command.substring(i, command.length())).toString(); 148 } 149 150 /** 151 * Replace {{ENV_VAR}} placeholders with the values of the corresponding environment variables. 152 * @param command An input list of strings with {{ENV_VAR}} placeholders 153 * to be replaced with the values of the corresponding environment variables. 154 * Replace unknown/unset variables with an empty string. 155 * @return A new list of strings with all the placeholders expanded. 156 */ 157 public static List<String> expandEnvironmentVariables(final List<String> command) { 158 final ArrayList<String> res = new ArrayList<>(command.size()); 159 for (final String cmd : command) { 160 res.add(expandEnvironmentVariables(cmd)); 161 } 162 return res; 163 } 164 165 /** 166 * Runs the configured process. 167 * @throws IllegalStateException if the process is already running or has been running before. 168 */ 169 @Override 170 public void run() { 171 172 this.stateLock.lock(); 173 174 try { 175 176 if (this.state != RunnableProcessState.INIT) { 177 throw new IllegalStateException("The RunnableProcess can't be reused"); 178 } 179 180 // Setup the stdout and stderr destinations. 181 final File errFile = new File(folder, standardErrorFileName); 182 final File outFile = new File(folder, standardOutFileName); 183 184 // Launch the process 185 try { 186 187 LOG.log(Level.FINEST, 188 "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}", 189 new Object[] {this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()}); 190 191 this.process = new ProcessBuilder() 192 .command(this.command) 193 .directory(this.folder) 194 .redirectError(errFile) 195 .redirectOutput(outFile) 196 .start(); 197 198 this.setState(RunnableProcessState.RUNNING); 199 this.processObserver.onProcessStarted(this.id); 200 201 } catch (final IOException ex) { 202 LOG.log(Level.SEVERE, "Unable to spawn process " + this.id + " with command " + this.command, ex); 203 } 204 205 } finally { 206 this.stateLock.unlock(); 207 } 208 209 try { 210 211 // Wait for its completion 212 LOG.log(Level.FINER, "Wait for process completion: {0}", this.id); 213 final int returnValue = this.process.waitFor(); 214 this.processObserver.onProcessExit(this.id, returnValue); 215 216 this.stateLock.lock(); 217 try { 218 this.setState(RunnableProcessState.ENDED); 219 this.doneCond.signalAll(); 220 } finally { 221 this.stateLock.unlock(); 222 } 223 224 LOG.log(Level.FINER, "Process \"{0}\" returned {1}", new Object[] {this.id, returnValue}); 225 226 } catch (final InterruptedException ex) { 227 LOG.log(Level.SEVERE, 228 "Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}", 229 new Object[] {this.id, ex}); 230 } 231 } 232 233 /** 234 * Cancels the running process if it is running. 235 */ 236 public void cancel() { 237 238 this.stateLock.lock(); 239 240 try { 241 242 if (this.state == RunnableProcessState.RUNNING) { 243 this.process.destroy(); 244 if (!this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS)) { 245 LOG.log(Level.FINE, "{0} milliseconds elapsed", DESTROY_WAIT_TIME); 246 } 247 } 248 249 if (this.state == RunnableProcessState.RUNNING) { 250 LOG.log(Level.WARNING, "The child process survived Process.destroy()"); 251 if (OSUtils.isUnix() || OSUtils.isWindows()) { 252 LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line"); 253 try { 254 final long pid = readPID(); 255 OSUtils.kill(pid); 256 } catch (final IOException | InterruptedException | NumberFormatException e) { 257 LOG.log(Level.SEVERE, "Unable to kill the process.", e); 258 } 259 } 260 } 261 262 } catch (final InterruptedException ex) { 263 LOG.log(Level.SEVERE, 264 "Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}", 265 new Object[] {this.id, ex}); 266 } finally { 267 this.stateLock.unlock(); 268 } 269 } 270 271 /** 272 * @return the PID stored in the PID file. 273 * @throws IOException if the file can't be read. 274 */ 275 private long readPID() throws IOException { 276 final String pidFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME; 277 try (final BufferedReader r = new BufferedReader( 278 new InputStreamReader(new FileInputStream(pidFileName), StandardCharsets.UTF_8))) { 279 return Long.parseLong(r.readLine()); 280 } 281 } 282 283 /** 284 * @return the ID of the process. 285 */ 286 public String getId() { 287 return this.id; 288 } 289 290 /** 291 * @return the command given to the process. 292 */ 293 public List<String> getCommand() { 294 return this.command; 295 } 296 297 /** 298 * Sets a new state for the process. 299 * @param newState a new process state to transition to. 300 * @throws IllegalStateException if the new state is illegal. 301 */ 302 private void setState(final RunnableProcessState newState) { 303 if (!this.state.isLegal(newState)) { 304 throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal"); 305 } 306 this.state = newState; 307 } 308}