001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.local.driver; 020 021import org.apache.reef.annotations.audience.Private; 022import org.apache.reef.annotations.audience.TaskSide; 023import org.apache.reef.runtime.common.files.REEFFileNames; 024import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver; 025import org.apache.reef.runtime.local.process.RunnableProcess; 026import org.apache.reef.runtime.local.process.RunnableProcessObserver; 027 028import java.io.File; 029import java.io.IOException; 030import java.nio.file.Files; 031import java.nio.file.Path; 032import java.util.Arrays; 033import java.util.List; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037/** 038 * A Container that runs an Evaluator in a Process. 039 */ 040@Private 041@TaskSide 042public final class ProcessContainer implements Container { 043 044 private static final Logger LOG = Logger.getLogger(ProcessContainer.class.getName()); 045 046 private final String errorHandlerRID; 047 private final String nodeID; 048 private final File folder; 049 private final String containedID; 050 private final int megaBytes; 051 private final int numberOfCores; 052 private final String rackName; 053 private final REEFFileNames fileNames; 054 private final File localFolder; 055 private final File globalFolder; 056 private final RunnableProcessObserver processObserver; 057 private final ThreadGroup threadGroup; 058 059 private Thread theThread; 060 private RunnableProcess process; 061 062 /** 063 * @param errorHandlerRID the remoteID of the error handler. 064 * @param nodeID the ID of the (fake) node this Container is instantiated on 065 * @param containedID the ID used to identify this container uniquely 066 * @param folder the folder in which logs etc. will be deposited 067 */ 068 ProcessContainer(final String errorHandlerRID, 069 final String nodeID, 070 final String containedID, 071 final File folder, 072 final int megaBytes, 073 final int numberOfCores, 074 final String rackName, 075 final REEFFileNames fileNames, 076 final ReefRunnableProcessObserver processObserver, 077 final ThreadGroup threadGroup) { 078 079 this.errorHandlerRID = errorHandlerRID; 080 this.nodeID = nodeID; 081 this.containedID = containedID; 082 this.folder = folder; 083 this.megaBytes = megaBytes; 084 this.numberOfCores = numberOfCores; 085 this.rackName = rackName; 086 this.fileNames = fileNames; 087 this.processObserver = processObserver; 088 this.threadGroup = threadGroup; 089 090 final File reefFolder = new File(folder, fileNames.getREEFFolderName()); 091 092 this.localFolder = new File(reefFolder, fileNames.getLocalFolderName()); 093 if (!this.localFolder.exists() && !this.localFolder.mkdirs()) { 094 LOG.log(Level.WARNING, "Failed to create [{0}]", this.localFolder.getAbsolutePath()); 095 } 096 097 this.globalFolder = new File(reefFolder, fileNames.getGlobalFolderName()); 098 if (!this.globalFolder.exists() && !this.globalFolder.mkdirs()) { 099 LOG.log(Level.WARNING, "Failed to create [{0}]", this.globalFolder.getAbsolutePath()); 100 } 101 } 102 103 private static void copy(final Iterable<File> files, final File folder) throws IOException { 104 for (final File sourceFile : files) { 105 final File destinationFile = new File(folder, sourceFile.getName()); 106 if (Files.isSymbolicLink(sourceFile.toPath())) { 107 final Path linkTargetPath = Files.readSymbolicLink(sourceFile.toPath()); 108 Files.createSymbolicLink(destinationFile.toPath(), linkTargetPath); 109 } else { 110 Files.copy(sourceFile.toPath(), destinationFile.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING); 111 } 112 } 113 } 114 115 @Override 116 public void addLocalFiles(final Iterable<File> files) { 117 try { 118 copy(files, this.localFolder); 119 } catch (final IOException e) { 120 throw new RuntimeException("Unable to copy files to the evaluator folder.", e); 121 } 122 } 123 124 @Override 125 public void addGlobalFiles(final File globalFilesFolder) { 126 try { 127 final File[] files = globalFilesFolder.listFiles(); 128 if (files != null) { 129 copy(Arrays.asList(files), this.globalFolder); 130 } 131 } catch (final IOException e) { 132 throw new RuntimeException("Unable to copy files to the evaluator folder.", e); 133 } 134 } 135 136 @Override 137 public void run(final List<String> commandLine) { 138 139 this.process = new RunnableProcess( 140 commandLine, 141 this.containedID, 142 this.folder, 143 this.processObserver, 144 this.fileNames.getEvaluatorStdoutFileName(), 145 this.fileNames.getEvaluatorStderrFileName()); 146 147 this.theThread = new Thread(this.threadGroup, this.process, "ProcessContainer:" + this.containedID); 148 this.theThread.start(); 149 } 150 151 @Override 152 public boolean isRunning() { 153 return null != this.theThread && this.theThread.isAlive(); 154 } 155 156 @Override 157 public int getMemory() { 158 return this.megaBytes; 159 } 160 161 @Override 162 public int getNumberOfCores() { 163 return this.numberOfCores; 164 } 165 166 @Override 167 public String getRackName() { 168 return this.rackName; 169 } 170 171 @Override 172 public File getFolder() { 173 return this.folder; 174 } 175 176 @Override 177 public String getNodeID() { 178 return this.nodeID; 179 } 180 181 @Override 182 public String getContainerID() { 183 return this.containedID; 184 } 185 186 @Override 187 public void close() { 188 if (isRunning()) { 189 LOG.log(Level.WARNING, "Force-closing a container that is still running: {0}", this); 190 this.process.cancel(); 191 } 192 } 193 194 @Override 195 public String toString() { 196 return String.format( 197 "ProcessContainer{containedID=%s, nodeID=%s, errorHandlerRID=%s, folder=%s, rack=%s}", 198 this.containedID, this.nodeID, this.errorHandlerRID, this.folder, this.rackName); 199 } 200}