This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.mesos.evaluator;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.runtime.common.files.REEFFileNames;
023import org.apache.reef.runtime.mesos.evaluator.parameters.MesosExecutorId;
024import org.apache.reef.runtime.mesos.util.EvaluatorControl;
025import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
026import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
027import org.apache.reef.runtime.mesos.util.MesosRemoteManager;
028import org.apache.reef.tang.Injector;
029import org.apache.reef.tang.JavaConfigurationBuilder;
030import org.apache.reef.tang.Tang;
031import org.apache.reef.tang.annotations.Parameter;
032import org.apache.reef.tang.formats.CommandLine;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.FileUtil;
036import org.apache.hadoop.fs.Path;
037import org.apache.mesos.Executor;
038import org.apache.mesos.ExecutorDriver;
039import org.apache.mesos.MesosExecutorDriver;
040import org.apache.mesos.Protos.ExecutorInfo;
041import org.apache.mesos.Protos.FrameworkInfo;
042import org.apache.mesos.Protos.SlaveInfo;
043import org.apache.mesos.Protos.Status;
044import org.apache.mesos.Protos.TaskID;
045import org.apache.mesos.Protos.TaskInfo;
046import org.apache.mesos.Protos.TaskState;
047import org.apache.mesos.Protos.TaskStatus;
048
049import javax.inject.Inject;
050import java.io.File;
051import java.io.IOException;
052import java.util.Arrays;
053import java.util.List;
054import java.util.concurrent.ExecutorService;
055import java.util.concurrent.Executors;
056import java.util.logging.Level;
057import java.util.logging.Logger;
058
059/**
060 * REEF implementation of Mesos Executor.
061 */
062public final class REEFExecutor implements Executor {
063  private static final Logger LOG = Logger.getLogger(REEFExecutor.class.getName());
064
065  private final MesosExecutorDriver mesosExecutorDriver;
066  private final MesosRemoteManager mesosRemoteManager;
067  private final ExecutorService executorService;
068  private final REEFFileNames fileNames;
069  private final String mesosExecutorId;
070
071  private Process evaluatorProcess;
072  private Integer evaluatorProcessExitValue;
073
074  @Inject
075  REEFExecutor(final EvaluatorControlHandler evaluatorControlHandler,
076               final MesosRemoteManager mesosRemoteManager,
077               final REEFFileNames fileNames,
078               @Parameter(MesosExecutorId.class) final String mesosExecutorId) {
079    this.mesosRemoteManager = mesosRemoteManager;
080    this.mesosRemoteManager.registerHandler(EvaluatorControl.class, evaluatorControlHandler);
081    this.mesosExecutorDriver = new MesosExecutorDriver(this);
082    this.executorService = Executors.newCachedThreadPool();
083    this.fileNames = fileNames;
084    this.mesosExecutorId = mesosExecutorId;
085  }
086
087  @Override
088  public void registered(final ExecutorDriver driver,
089                         final ExecutorInfo executorInfo,
090                         final FrameworkInfo frameworkInfo,
091                         final SlaveInfo slaveInfo) {
092    LOG.log(Level.FINEST, "Executor registered. driver: {0} executorInfo: {1} frameworkInfo: {2} slaveInfo {3}",
093        new Object[]{driver, executorInfo, frameworkInfo, slaveInfo});
094  }
095
096  @Override
097  public void reregistered(final ExecutorDriver driver, final SlaveInfo slaveInfo) {
098    LOG.log(Level.FINEST, "Executor reregistered. driver: {0}", driver);
099  }
100
101  @Override
102  public void disconnected(final ExecutorDriver driver) {
103    this.onRuntimeError();
104  }
105
106  /**
107   * We assume a long-running Mesos Task that manages a REEF Evaluator process, leveraging Mesos Executor's interface.
108   */
109  @Override
110  public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
111    driver.sendStatusUpdate(TaskStatus.newBuilder()
112        .setTaskId(TaskID.newBuilder().setValue(this.mesosExecutorId).build())
113        .setState(TaskState.TASK_STARTING)
114        .setSlaveId(task.getSlaveId())
115        .setMessage(this.mesosRemoteManager.getMyIdentifier())
116        .build());
117  }
118
119  @Override
120  public void killTask(final ExecutorDriver driver, final TaskID taskId) {
121    this.onStop();
122  }
123
124  @Override
125  public void frameworkMessage(final ExecutorDriver driver, final byte[] data) {
126    LOG.log(Level.FINEST, "Framework Messge. ExecutorDriver: {0}, data: {1}.",
127        new Object[]{driver, data});
128  }
129
130  @Override
131  public void shutdown(final ExecutorDriver driver) {
132    this.onStop();
133  }
134
135  @Override
136  public void error(final ExecutorDriver driver, final String message) {
137    this.onRuntimeError();
138  }
139
140  /////////////////////////////////////////////////////////////////
141  // HELPER METHODS
142
143  private void onStart() {
144    this.executorService.submit(new Thread() {
145      public void run() {
146        final Status status;
147        status = mesosExecutorDriver.run();
148        LOG.log(Level.INFO, "MesosExecutorDriver ended with status {0}", status);
149      }
150    });
151  }
152
153  private void onStop() {
154    // Shutdown REEF Evaluator
155    if (this.evaluatorProcess != null) {
156      this.evaluatorProcess.destroy();
157      mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder()
158          .setTaskId(TaskID.newBuilder()
159              .setValue(mesosExecutorId)
160              .build())
161          .setState(TaskState.TASK_FINISHED)
162          .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue))
163          .build());
164    } else {
165      mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder()
166          .setTaskId(TaskID.newBuilder()
167              .setValue(mesosExecutorId)
168              .build())
169          .setState(TaskState.TASK_FINISHED)
170          .setData(ByteString.copyFromUtf8("eval_not_run"))
171          // TODO[JIRA REEF-102]: a hack to pass closeEvaluator test, replace this with a better interface
172          .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue))
173          .build());
174    }
175
176    // Shutdown Mesos Executor
177    this.executorService.shutdown();
178    this.mesosExecutorDriver.stop();
179  }
180
181  private void onRuntimeError() {
182    // Shutdown REEF Evaluator
183    if (this.evaluatorProcess != null) {
184      this.evaluatorProcess.destroy();
185    }
186    mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder()
187        .setTaskId(TaskID.newBuilder()
188            .setValue(mesosExecutorId)
189            .build())
190        .setState(TaskState.TASK_FAILED)
191        .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue))
192        .build());
193
194    // Shutdown Mesos Executor
195    this.executorService.shutdown();
196    this.mesosExecutorDriver.stop();
197  }
198
199  public void onEvaluatorRelease(final EvaluatorRelease evaluatorRelease) {
200    LOG.log(Level.INFO, "Release!!!! {0}", evaluatorRelease.toString());
201    assert evaluatorRelease.getIdentifier().toString().equals(this.mesosExecutorId);
202    this.onStop();
203  }
204
205  public void onEvaluatorLaunch(final EvaluatorLaunch evaluatorLaunch) {
206    LOG.log(Level.INFO, "Launch!!!! {0}", evaluatorLaunch.toString());
207    assert evaluatorLaunch.getIdentifier().toString().equals(this.mesosExecutorId);
208    final ExecutorService evaluatorLaunchExecutorService = Executors.newSingleThreadExecutor();
209    evaluatorLaunchExecutorService.submit(new Thread() {
210      public void run() {
211        try {
212          final List<String> command = Arrays.asList(evaluatorLaunch.getCommand().toString().split(" "));
213          LOG.log(Level.INFO, "Command!!!! {0}", command);
214          final FileSystem fileSystem = FileSystem.get(new Configuration());
215          final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + mesosExecutorId);
216          final File localFolder = new File(fileNames.getREEFFolderName(), fileNames.getLocalFolderName());
217
218          FileUtil.copy(fileSystem, hdfsFolder, localFolder, true, new Configuration());
219
220          evaluatorProcess = new ProcessBuilder()
221              .command(command)
222              .redirectError(new File(fileNames.getEvaluatorStderrFileName()))
223              .redirectOutput(new File(fileNames.getEvaluatorStdoutFileName()))
224              .start();
225
226          evaluatorProcessExitValue = evaluatorProcess.waitFor();
227
228          fileSystem.close();
229        } catch (IOException | InterruptedException e) {
230          throw new RuntimeException(e);
231        }
232      }
233    });
234    evaluatorLaunchExecutorService.shutdown();
235  }
236
237  public static org.apache.reef.tang.Configuration parseCommandLine(final String[] args) throws IOException {
238    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
239
240    new CommandLine(confBuilder)
241        .registerShortNameOfClass(MesosExecutorId.class)
242        .processCommandLine(args);
243
244    return confBuilder.build();
245  }
246
247  /**
248   * The starting point of the executor.
249   */
250  public static void main(final String[] args) throws Exception {
251    final Injector injector = Tang.Factory.getTang().newInjector(parseCommandLine(args));
252    final REEFExecutor reefExecutor = injector.getInstance(REEFExecutor.class);
253    reefExecutor.onStart();
254  }
255}