001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.local.process; 020 021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; 022import org.apache.reef.util.OSUtils; 023 024import java.io.BufferedReader; 025import java.io.File; 026import java.io.FileReader; 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.locks.Condition; 032import java.util.concurrent.locks.Lock; 033import java.util.concurrent.locks.ReentrantLock; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037 038/** 039 * A runnable class that encapsulates a process. 040 */ 041public final class RunnableProcess implements Runnable { 042 043 private static final Logger LOG = Logger.getLogger(RunnableProcess.class.getName()); 044 045 private static final long DESTROY_WAIT_TIME = 100; 046 047 /** 048 * Name of the file used for STDERR redirection. 049 */ 050 private final String standardErrorFileName; 051 /** 052 * Name of the file used for STDOUT redirection. 053 */ 054 private final String standardOutFileName; 055 056 /** 057 * Command to execute. 058 */ 059 private final List<String> command; 060 /** 061 * User supplied ID of this process. 062 */ 063 private final String id; 064 /** 065 * The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited. 066 */ 067 private final File folder; 068 /** 069 * The coarse-grained lock for state transition. 070 */ 071 private final Lock stateLock = new ReentrantLock(); 072 private final Condition doneCond = stateLock.newCondition(); 073 /** 074 * This will be informed of process start and stop. 075 */ 076 private final RunnableProcessObserver processObserver; 077 /** 078 * The process. 079 */ 080 private Process process; 081 /** 082 * The state of the process. 083 */ 084 private State state = State.INIT; // synchronized on stateLock 085 086 /** 087 * @param command the command to execute. 088 * @param id The ID of the process. This is used to name files and in the logs created by this process. 089 * @param folder The folder in which this will store its stdout and stderr output 090 * @param processObserver will be informed of process state changes. 091 * @param standardOutFileName The name of the file used for redirecting STDOUT 092 * @param standardErrorFileName The name of the file used for redirecting STDERR 093 */ 094 public RunnableProcess(final List<String> command, 095 final String id, 096 final File folder, 097 final RunnableProcessObserver processObserver, 098 final String standardOutFileName, 099 final String standardErrorFileName) { 100 this.processObserver = processObserver; 101 this.command = new ArrayList<>(command); 102 this.id = id; 103 this.folder = folder; 104 assert (this.folder.isDirectory()); 105 this.folder.mkdirs(); 106 this.standardOutFileName = standardOutFileName; 107 this.standardErrorFileName = standardErrorFileName; 108 LOG.log(Level.FINEST, "RunnableProcess ready."); 109 } 110 111 /** 112 * Checks whether a transition from State 'from' to state 'to' is legal 113 * 114 * @param from 115 * @param to 116 * @return true, if the state transition is legal. False otherwise. 117 */ 118 private static boolean isLegal(final State from, final State to) { 119 switch (from) { 120 case INIT: 121 switch (to) { 122 case INIT: 123 case RUNNING: 124 case ENDED: 125 return true; 126 default: 127 return false; 128 } 129 case RUNNING: 130 switch (to) { 131 case ENDED: 132 return true; 133 default: 134 return false; 135 } 136 case ENDED: 137 return false; 138 default: 139 return false; 140 } 141 } 142 143 /** 144 * Runs the configured process. 145 * 146 * @throws java.lang.IllegalStateException if the process is already running or has been running before. 147 */ 148 @Override 149 public void run() { 150 this.stateLock.lock(); 151 try { 152 if (this.getState() != State.INIT) { 153 throw new IllegalStateException("The RunnableProcess can't be reused"); 154 } 155 156 // Setup the stdout and stderr destinations. 157 final File errFile = new File(folder, standardErrorFileName); 158 final File outFile = new File(folder, standardOutFileName); 159 160 // Launch the process 161 try { 162 LOG.log(Level.FINEST, "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}", 163 new Object[]{this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()}); 164 this.process = new ProcessBuilder() 165 .command(this.command) 166 .directory(this.folder) 167 .redirectError(errFile) 168 .redirectOutput(outFile) 169 .start(); 170 this.setState(State.RUNNING); 171 this.processObserver.onProcessStarted(this.id); 172 } catch (final IOException ex) { 173 LOG.log(Level.SEVERE, "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}", 174 new Object[]{this.id, this.command, ex}); 175 } 176 } finally { 177 this.stateLock.unlock(); 178 } 179 180 try { 181 // Wait for its completion 182 final int returnValue = process.waitFor(); 183 this.processObserver.onProcessExit(this.id, returnValue); 184 this.stateLock.lock(); 185 try { 186 this.setState(State.ENDED); 187 this.doneCond.signalAll(); 188 } finally { 189 this.stateLock.unlock(); 190 } 191 LOG.log(Level.FINEST, "Process \"{0}\" returned {1}", new Object[]{this.id, returnValue}); 192 } catch (final InterruptedException ex) { 193 LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}", 194 new Object[]{this.id, ex}); 195 } 196 } 197 198 199 /** 200 * Cancels the running process if it is running. 201 */ 202 public void cancel() { 203 this.stateLock.lock(); 204 try { 205 if (this.processIsRunning()) { 206 this.process.destroy(); 207 this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS); 208 } 209 210 if (this.processIsRunning()) { 211 LOG.log(Level.WARNING, "The child process survived Process.destroy()"); 212 if (OSUtils.isLinux()) { 213 LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line"); 214 try { 215 final long pid = readPID(); 216 OSUtils.kill(pid); 217 } catch (final IOException | InterruptedException e) { 218 LOG.log(Level.SEVERE, "Unable to kill the process.", e); 219 } 220 } 221 } 222 223 } catch (final InterruptedException ex) { 224 LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}", 225 new Object[]{this.id, ex}); 226 } finally { 227 this.stateLock.unlock(); 228 } 229 } 230 231 /** 232 * @return the PID stored in the PID file. 233 * @throws IOException if the file can't be read. 234 */ 235 private long readPID() throws IOException { 236 final String PIDFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME; 237 try (final BufferedReader r = new BufferedReader(new FileReader(PIDFileName))) { 238 return Long.valueOf(r.readLine()); 239 } 240 } 241 242 private boolean processIsRunning() { 243 return this.getState() == State.RUNNING; 244 } 245 246 /** 247 * @return the current State of the process. 248 */ 249 private State getState() { 250 return this.state; 251 } 252 253 /** 254 * Sets a new state for the process. 255 * 256 * @param newState 257 * @throws java.lang.IllegalStateException if the new state is illegal. 258 */ 259 private void setState(final State newState) { 260 if (!isLegal(this.state, newState)) { 261 throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal"); 262 } 263 this.state = newState; 264 } 265 266 /** 267 * The possible states of a process: INIT, RUNNING, ENDED. 268 */ 269 private enum State { 270 // After initialization 271 INIT, 272 // The process is running 273 RUNNING, 274 // The process ended 275 ENDED 276 } 277}