001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.mesos.evaluator; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.runtime.common.files.REEFFileNames; 023import org.apache.reef.runtime.mesos.evaluator.parameters.MesosExecutorId; 024import org.apache.reef.runtime.mesos.util.EvaluatorControl; 025import org.apache.reef.runtime.mesos.util.EvaluatorLaunch; 026import org.apache.reef.runtime.mesos.util.EvaluatorRelease; 027import org.apache.reef.runtime.mesos.util.MesosRemoteManager; 028import org.apache.reef.tang.Injector; 029import org.apache.reef.tang.JavaConfigurationBuilder; 030import org.apache.reef.tang.Tang; 031import org.apache.reef.tang.annotations.Parameter; 032import org.apache.reef.tang.formats.CommandLine; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.FileUtil; 036import org.apache.hadoop.fs.Path; 037import org.apache.mesos.Executor; 038import org.apache.mesos.ExecutorDriver; 039import org.apache.mesos.MesosExecutorDriver; 040import org.apache.mesos.Protos.ExecutorInfo; 041import org.apache.mesos.Protos.FrameworkInfo; 042import org.apache.mesos.Protos.SlaveInfo; 043import org.apache.mesos.Protos.Status; 044import org.apache.mesos.Protos.TaskID; 045import org.apache.mesos.Protos.TaskInfo; 046import org.apache.mesos.Protos.TaskState; 047import org.apache.mesos.Protos.TaskStatus; 048 049import javax.inject.Inject; 050import java.io.File; 051import java.io.IOException; 052import java.util.Arrays; 053import java.util.List; 054import java.util.concurrent.ExecutorService; 055import java.util.concurrent.Executors; 056import java.util.logging.Level; 057import java.util.logging.Logger; 058 059/** 060 * REEF implementation of Mesos Executor. 061 */ 062public final class REEFExecutor implements Executor { 063 private static final Logger LOG = Logger.getLogger(REEFExecutor.class.getName()); 064 065 private final MesosExecutorDriver mesosExecutorDriver; 066 private final MesosRemoteManager mesosRemoteManager; 067 private final ExecutorService executorService; 068 private final REEFFileNames fileNames; 069 private final String mesosExecutorId; 070 071 private Process evaluatorProcess; 072 private Integer evaluatorProcessExitValue; 073 074 @Inject 075 REEFExecutor(final EvaluatorControlHandler evaluatorControlHandler, 076 final MesosRemoteManager mesosRemoteManager, 077 final REEFFileNames fileNames, 078 @Parameter(MesosExecutorId.class) final String mesosExecutorId) { 079 this.mesosRemoteManager = mesosRemoteManager; 080 this.mesosRemoteManager.registerHandler(EvaluatorControl.class, evaluatorControlHandler); 081 this.mesosExecutorDriver = new MesosExecutorDriver(this); 082 this.executorService = Executors.newCachedThreadPool(); 083 this.fileNames = fileNames; 084 this.mesosExecutorId = mesosExecutorId; 085 } 086 087 @Override 088 public void registered(final ExecutorDriver driver, 089 final ExecutorInfo executorInfo, 090 final FrameworkInfo frameworkInfo, 091 final SlaveInfo slaveInfo) { 092 LOG.log(Level.FINEST, "Executor registered. driver: {0} executorInfo: {1} frameworkInfo: {2} slaveInfo {3}", 093 new Object[]{driver, executorInfo, frameworkInfo, slaveInfo}); 094 } 095 096 @Override 097 public void reregistered(final ExecutorDriver driver, final SlaveInfo slaveInfo) { 098 LOG.log(Level.FINEST, "Executor reregistered. driver: {0}", driver); 099 } 100 101 @Override 102 public void disconnected(final ExecutorDriver driver) { 103 this.onRuntimeError(); 104 } 105 106 /** 107 * We assume a long-running Mesos Task that manages a REEF Evaluator process, leveraging Mesos Executor's interface. 108 */ 109 @Override 110 public void launchTask(final ExecutorDriver driver, final TaskInfo task) { 111 driver.sendStatusUpdate(TaskStatus.newBuilder() 112 .setTaskId(TaskID.newBuilder().setValue(this.mesosExecutorId).build()) 113 .setState(TaskState.TASK_STARTING) 114 .setSlaveId(task.getSlaveId()) 115 .setMessage(this.mesosRemoteManager.getMyIdentifier()) 116 .build()); 117 } 118 119 @Override 120 public void killTask(final ExecutorDriver driver, final TaskID taskId) { 121 this.onStop(); 122 } 123 124 @Override 125 public void frameworkMessage(final ExecutorDriver driver, final byte[] data) { 126 LOG.log(Level.FINEST, "Framework Messge. ExecutorDriver: {0}, data: {1}.", 127 new Object[]{driver, data}); 128 } 129 130 @Override 131 public void shutdown(final ExecutorDriver driver) { 132 this.onStop(); 133 } 134 135 @Override 136 public void error(final ExecutorDriver driver, final String message) { 137 this.onRuntimeError(); 138 } 139 140 ///////////////////////////////////////////////////////////////// 141 // HELPER METHODS 142 143 private void onStart() { 144 this.executorService.submit(new Thread() { 145 public void run() { 146 final Status status; 147 status = mesosExecutorDriver.run(); 148 LOG.log(Level.INFO, "MesosExecutorDriver ended with status {0}", status); 149 } 150 }); 151 } 152 153 private void onStop() { 154 // Shutdown REEF Evaluator 155 if (this.evaluatorProcess != null) { 156 this.evaluatorProcess.destroy(); 157 mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder() 158 .setTaskId(TaskID.newBuilder() 159 .setValue(mesosExecutorId) 160 .build()) 161 .setState(TaskState.TASK_FINISHED) 162 .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue)) 163 .build()); 164 } else { 165 mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder() 166 .setTaskId(TaskID.newBuilder() 167 .setValue(mesosExecutorId) 168 .build()) 169 .setState(TaskState.TASK_FINISHED) 170 .setData(ByteString.copyFromUtf8("eval_not_run")) 171 // TODO[JIRA REEF-102]: a hack to pass closeEvaluator test, replace this with a better interface 172 .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue)) 173 .build()); 174 } 175 176 // Shutdown Mesos Executor 177 this.executorService.shutdown(); 178 this.mesosExecutorDriver.stop(); 179 } 180 181 private void onRuntimeError() { 182 // Shutdown REEF Evaluator 183 if (this.evaluatorProcess != null) { 184 this.evaluatorProcess.destroy(); 185 } 186 mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder() 187 .setTaskId(TaskID.newBuilder() 188 .setValue(mesosExecutorId) 189 .build()) 190 .setState(TaskState.TASK_FAILED) 191 .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue)) 192 .build()); 193 194 // Shutdown Mesos Executor 195 this.executorService.shutdown(); 196 this.mesosExecutorDriver.stop(); 197 } 198 199 public void onEvaluatorRelease(final EvaluatorRelease evaluatorRelease) { 200 LOG.log(Level.INFO, "Release!!!! {0}", evaluatorRelease.toString()); 201 assert evaluatorRelease.getIdentifier().toString().equals(this.mesosExecutorId); 202 this.onStop(); 203 } 204 205 public void onEvaluatorLaunch(final EvaluatorLaunch evaluatorLaunch) { 206 LOG.log(Level.INFO, "Launch!!!! {0}", evaluatorLaunch.toString()); 207 assert evaluatorLaunch.getIdentifier().toString().equals(this.mesosExecutorId); 208 final ExecutorService evaluatorLaunchExecutorService = Executors.newSingleThreadExecutor(); 209 evaluatorLaunchExecutorService.submit(new Thread() { 210 public void run() { 211 try { 212 final List<String> command = Arrays.asList(evaluatorLaunch.getCommand().toString().split(" ")); 213 LOG.log(Level.INFO, "Command!!!! {0}", command); 214 final FileSystem fileSystem = FileSystem.get(new Configuration()); 215 final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + mesosExecutorId); 216 final File localFolder = new File(fileNames.getREEFFolderName(), fileNames.getLocalFolderName()); 217 218 FileUtil.copy(fileSystem, hdfsFolder, localFolder, true, new Configuration()); 219 220 evaluatorProcess = new ProcessBuilder() 221 .command(command) 222 .redirectError(new File(fileNames.getEvaluatorStderrFileName())) 223 .redirectOutput(new File(fileNames.getEvaluatorStdoutFileName())) 224 .start(); 225 226 evaluatorProcessExitValue = evaluatorProcess.waitFor(); 227 228 fileSystem.close(); 229 } catch (IOException | InterruptedException e) { 230 throw new RuntimeException(e); 231 } 232 } 233 }); 234 evaluatorLaunchExecutorService.shutdown(); 235 } 236 237 public static org.apache.reef.tang.Configuration parseCommandLine(final String[] args) throws IOException { 238 final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 239 240 new CommandLine(confBuilder) 241 .registerShortNameOfClass(MesosExecutorId.class) 242 .processCommandLine(args); 243 244 return confBuilder.build(); 245 } 246 247 /** 248 * The starting point of the executor. 249 */ 250 public static void main(final String[] args) throws Exception { 251 final Injector injector = Tang.Factory.getTang().newInjector(parseCommandLine(args)); 252 final REEFExecutor reefExecutor = injector.getInstance(REEFExecutor.class); 253 reefExecutor.onStart(); 254 } 255}