001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.local.process; 020 021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; 022import org.apache.reef.util.OSUtils; 023 024import java.io.*; 025import java.nio.charset.StandardCharsets; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.locks.Condition; 030import java.util.concurrent.locks.Lock; 031import java.util.concurrent.locks.ReentrantLock; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035 036/** 037 * A runnable class that encapsulates a process. 038 */ 039public final class RunnableProcess implements Runnable { 040 041 private static final Logger LOG = Logger.getLogger(RunnableProcess.class.getName()); 042 043 private static final long DESTROY_WAIT_TIME = 100; 044 045 /** 046 * Name of the file used for STDERR redirection. 047 */ 048 private final String standardErrorFileName; 049 /** 050 * Name of the file used for STDOUT redirection. 051 */ 052 private final String standardOutFileName; 053 054 /** 055 * Command to execute. 056 */ 057 private final List<String> command; 058 /** 059 * User supplied ID of this process. 060 */ 061 private final String id; 062 /** 063 * The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited. 064 */ 065 private final File folder; 066 /** 067 * The coarse-grained lock for state transition. 068 */ 069 private final Lock stateLock = new ReentrantLock(); 070 private final Condition doneCond = stateLock.newCondition(); 071 /** 072 * This will be informed of process start and stop. 073 */ 074 private final RunnableProcessObserver processObserver; 075 /** 076 * The process. 077 */ 078 private Process process; 079 /** 080 * The state of the process. 081 */ 082 private State state = State.INIT; // synchronized on stateLock 083 084 /** 085 * @param command the command to execute. 086 * @param id The ID of the process. This is used to name files and in the logs created 087 * by this process. 088 * @param folder The folder in which this will store its stdout and stderr output 089 * @param processObserver will be informed of process state changes. 090 * @param standardOutFileName The name of the file used for redirecting STDOUT 091 * @param standardErrorFileName The name of the file used for redirecting STDERR 092 */ 093 public RunnableProcess(final List<String> command, 094 final String id, 095 final File folder, 096 final RunnableProcessObserver processObserver, 097 final String standardOutFileName, 098 final String standardErrorFileName) { 099 this.processObserver = processObserver; 100 this.command = new ArrayList<>(command); 101 this.id = id; 102 this.folder = folder; 103 assert this.folder.isDirectory(); 104 if (!this.folder.exists() && !this.folder.mkdirs()) { 105 LOG.log(Level.WARNING, "Failed to create [{0}]", this.folder.getAbsolutePath()); 106 } 107 this.standardOutFileName = standardOutFileName; 108 this.standardErrorFileName = standardErrorFileName; 109 LOG.log(Level.FINEST, "RunnableProcess ready."); 110 } 111 112 /** 113 * Checks whether a transition from State 'from' to state 'to' is legal. 114 * 115 * @param from 116 * @param to 117 * @return true, if the state transition is legal. False otherwise. 118 */ 119 private static boolean isLegal(final State from, final State to) { 120 switch (from) { 121 case INIT: 122 switch (to) { 123 case INIT: 124 case RUNNING: 125 case ENDED: 126 return true; 127 default: 128 return false; 129 } 130 case RUNNING: 131 switch (to) { 132 case ENDED: 133 return true; 134 default: 135 return false; 136 } 137 case ENDED: 138 return false; 139 default: 140 return false; 141 } 142 } 143 144 /** 145 * Runs the configured process. 146 * 147 * @throws java.lang.IllegalStateException if the process is already running or has been running before. 148 */ 149 @Override 150 public void run() { 151 this.stateLock.lock(); 152 try { 153 if (this.getState() != State.INIT) { 154 throw new IllegalStateException("The RunnableProcess can't be reused"); 155 } 156 157 // Setup the stdout and stderr destinations. 158 final File errFile = new File(folder, standardErrorFileName); 159 final File outFile = new File(folder, standardOutFileName); 160 161 // Launch the process 162 try { 163 LOG.log(Level.FINEST, "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}", 164 new Object[]{this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()}); 165 this.process = new ProcessBuilder() 166 .command(this.command) 167 .directory(this.folder) 168 .redirectError(errFile) 169 .redirectOutput(outFile) 170 .start(); 171 this.setState(State.RUNNING); 172 this.processObserver.onProcessStarted(this.id); 173 } catch (final IOException ex) { 174 LOG.log(Level.SEVERE, "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}", 175 new Object[]{this.id, this.command, ex}); 176 } 177 } finally { 178 this.stateLock.unlock(); 179 } 180 181 try { 182 // Wait for its completion 183 final int returnValue = process.waitFor(); 184 this.processObserver.onProcessExit(this.id, returnValue); 185 this.stateLock.lock(); 186 try { 187 this.setState(State.ENDED); 188 this.doneCond.signalAll(); 189 } finally { 190 this.stateLock.unlock(); 191 } 192 LOG.log(Level.FINEST, "Process \"{0}\" returned {1}", new Object[]{this.id, returnValue}); 193 } catch (final InterruptedException ex) { 194 LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}", 195 new Object[]{this.id, ex}); 196 } 197 } 198 199 200 /** 201 * Cancels the running process if it is running. 202 */ 203 public void cancel() { 204 this.stateLock.lock(); 205 try { 206 if (this.processIsRunning()) { 207 this.process.destroy(); 208 if (!this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS)) { 209 LOG.log(Level.FINE, "{0} milliseconds elapsed", DESTROY_WAIT_TIME); 210 } 211 } 212 213 if (this.processIsRunning()) { 214 LOG.log(Level.WARNING, "The child process survived Process.destroy()"); 215 if (OSUtils.isUnix() || OSUtils.isWindows()) { 216 LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line"); 217 try { 218 final long pid = readPID(); 219 OSUtils.kill(pid); 220 } catch (final IOException | InterruptedException | NumberFormatException e) { 221 LOG.log(Level.SEVERE, "Unable to kill the process.", e); 222 } 223 } 224 } 225 226 } catch (final InterruptedException ex) { 227 LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}", 228 new Object[]{this.id, ex}); 229 } finally { 230 this.stateLock.unlock(); 231 } 232 } 233 234 /** 235 * @return the PID stored in the PID file. 236 * @throws IOException if the file can't be read. 237 */ 238 private long readPID() throws IOException { 239 final String pidFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME; 240 try (final BufferedReader r = 241 new BufferedReader(new InputStreamReader(new FileInputStream(pidFileName), StandardCharsets.UTF_8))) { 242 return Long.parseLong(r.readLine()); 243 } 244 } 245 246 private boolean processIsRunning() { 247 return this.getState() == State.RUNNING; 248 } 249 250 /** 251 * @return the current State of the process. 252 */ 253 private State getState() { 254 return this.state; 255 } 256 257 /** 258 * Sets a new state for the process. 259 * 260 * @param newState 261 * @throws java.lang.IllegalStateException if the new state is illegal. 262 */ 263 private void setState(final State newState) { 264 if (!isLegal(this.state, newState)) { 265 throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal"); 266 } 267 this.state = newState; 268 } 269 270 /** 271 * The possible states of a process: INIT, RUNNING, ENDED. 272 */ 273 private enum State { 274 // After initialization 275 INIT, 276 // The process is running 277 RUNNING, 278 // The process ended 279 ENDED 280 } 281}