This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.retained_eval;
020
021import org.apache.reef.driver.client.JobMessageObserver;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ClosedContext;
024import org.apache.reef.driver.context.ContextConfiguration;
025import org.apache.reef.driver.context.FailedContext;
026import org.apache.reef.driver.evaluator.AllocatedEvaluator;
027import org.apache.reef.driver.evaluator.EvaluatorRequest;
028import org.apache.reef.driver.evaluator.EvaluatorRequestor;
029import org.apache.reef.driver.evaluator.FailedEvaluator;
030import org.apache.reef.driver.task.CompletedTask;
031import org.apache.reef.driver.task.TaskConfiguration;
032import org.apache.reef.examples.library.Command;
033import org.apache.reef.examples.library.ShellTask;
034import org.apache.reef.tang.JavaConfigurationBuilder;
035import org.apache.reef.tang.Tang;
036import org.apache.reef.tang.annotations.Parameter;
037import org.apache.reef.tang.annotations.Unit;
038import org.apache.reef.tang.exceptions.BindException;
039import org.apache.reef.wake.EventHandler;
040import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
041import org.apache.reef.wake.time.event.StartTime;
042import org.apache.reef.wake.time.event.StopTime;
043
044import javax.inject.Inject;
045import java.util.ArrayList;
046import java.util.HashMap;
047import java.util.List;
048import java.util.Map;
049import java.util.logging.Level;
050import java.util.logging.Logger;
051
052/**
053 * Retained Evaluator example job driver. Execute shell command on all evaluators,
054 * capture stdout, and return concatenated results back to the client.
055 */
056@Unit
057public final class JobDriver {
058  /**
059   * Standard Java logger.
060   */
061  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
062
063  /**
064   * Duration of one clock interval.
065   */
066  private static final int CHECK_UP_INTERVAL = 1000; // 1 sec.
067
068  /**
069   * String codec is used to encode the results
070   * before passing them back to the client.
071   */
072  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
073  /**
074   * Job observer on the client.
075   * We use it to send results from the driver back to the client.
076   */
077  private final JobMessageObserver jobMessageObserver;
078  /**
079   * Job driver uses EvaluatorRequestor
080   * to request Evaluators that will run the Tasks.
081   */
082  private final EvaluatorRequestor evaluatorRequestor;
083  /**
084   * Number of Evalutors to request (default is 1).
085   */
086  private final int numEvaluators;
087  /**
088   * Shell execution results from each Evaluator.
089   */
090  private final List<String> results = new ArrayList<>();
091  /**
092   * Map from context ID to running evaluator context.
093   */
094  private final Map<String, ActiveContext> contexts = new HashMap<>();
095  /**
096   * Job driver state.
097   */
098  private State state = State.INIT;
099  /**
100   * First command to execute. Sometimes client can send us the first command
101   * before Evaluators are available; we need to store this command here.
102   */
103  private String cmd;
104  /**
105   * Number of evaluators/tasks to complete.
106   */
107  private int expectCount = 0;
108
109  /**
110   * Job driver constructor.
111   * All parameters are injected from TANG automatically.
112   *
113   * @param jobMessageObserver is used to send messages back to the client.
114   * @param evaluatorRequestor is used to request Evaluators.
115   */
116  @Inject
117  JobDriver(final JobMessageObserver jobMessageObserver,
118            final EvaluatorRequestor evaluatorRequestor,
119            final @Parameter(Launch.NumEval.class) Integer numEvaluators) {
120    this.jobMessageObserver = jobMessageObserver;
121    this.evaluatorRequestor = evaluatorRequestor;
122    this.numEvaluators = numEvaluators;
123  }
124
125  /**
126   * Construct the final result and forward it to the Client.
127   */
128  private void returnResults() {
129    final StringBuilder sb = new StringBuilder();
130    for (final String result : this.results) {
131      sb.append(result);
132    }
133    this.results.clear();
134    LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
135    this.jobMessageObserver.sendMessageToClient(CODEC.encode(sb.toString()));
136  }
137
138  /**
139   * Submit command to all available evaluators.
140   *
141   * @param command shell command to execute.
142   */
143  private void submit(final String command) {
144    LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
145        new Object[]{command, this.contexts.size(), this.state});
146    assert (this.state == State.READY);
147    this.expectCount = this.contexts.size();
148    this.state = State.WAIT_TASKS;
149    this.cmd = null;
150    for (final ActiveContext context : this.contexts.values()) {
151      this.submit(context, command);
152    }
153  }
154
155  /**
156   * Submit a Task that execute the command to a single Evaluator.
157   * This method is called from <code>submitTask(cmd)</code>.
158   */
159  private void submit(final ActiveContext context, final String command) {
160    try {
161      LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context});
162      final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
163      cb.addConfiguration(
164          TaskConfiguration.CONF
165              .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
166              .set(TaskConfiguration.TASK, ShellTask.class)
167              .build()
168      );
169      cb.bindNamedParameter(Command.class, command);
170      context.submitTask(cb.build());
171    } catch (final BindException ex) {
172      LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
173      context.close();
174      throw new RuntimeException(ex);
175    }
176  }
177
178  /**
179   * Request the evaluators.
180   */
181  private synchronized void requestEvaluators() {
182    assert (this.state == State.INIT);
183    LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators);
184    this.evaluatorRequestor.submit(
185        EvaluatorRequest.newBuilder()
186            .setMemory(128)
187            .setNumberOfCores(1)
188            .setNumber(this.numEvaluators).build()
189    );
190    this.state = State.WAIT_EVALUATORS;
191    this.expectCount = this.numEvaluators;
192  }
193
194  /**
195   * Possible states of the job driver. Can be one of:
196   * <dl>
197   * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd>
198   * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd>
199   * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd>
200   * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd>
201   * </dl>
202   */
203  private enum State {
204    INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
205  }
206
207  /**
208   * Receive notification that an Evaluator had been allocated,
209   * and submitTask a new Task in that Evaluator.
210   */
211  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
212    @Override
213    public void onNext(final AllocatedEvaluator eval) {
214      synchronized (JobDriver.this) {
215        LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
216            new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()});
217        assert (JobDriver.this.state == State.WAIT_EVALUATORS);
218        try {
219          eval.submitContext(ContextConfiguration.CONF.set(
220              ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
221        } catch (final BindException ex) {
222          LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex);
223          throw new RuntimeException(ex);
224        }
225      }
226    }
227  }
228
229  /**
230   * Receive notification that the entire Evaluator had failed.
231   * Stop other jobs and pass this error to the job observer on the client.
232   */
233  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
234    @Override
235    public void onNext(final FailedEvaluator eval) {
236      synchronized (JobDriver.this) {
237        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
238        for (final FailedContext failedContext : eval.getFailedContextList()) {
239          JobDriver.this.contexts.remove(failedContext.getId());
240        }
241        throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
242      }
243    }
244  }
245
246  /**
247   * Receive notification that a new Context is available.
248   * Submit a new Distributed Shell Task to that Context.
249   */
250  final class ActiveContextHandler implements EventHandler<ActiveContext> {
251    @Override
252    public void onNext(final ActiveContext context) {
253      synchronized (JobDriver.this) {
254        LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
255            new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state});
256        assert (JobDriver.this.state == State.WAIT_EVALUATORS);
257        JobDriver.this.contexts.put(context.getId(), context);
258        if (--JobDriver.this.expectCount <= 0) {
259          JobDriver.this.state = State.READY;
260          if (JobDriver.this.cmd == null) {
261            LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
262                JobDriver.this.state);
263          } else {
264            JobDriver.this.submit(JobDriver.this.cmd);
265          }
266        }
267      }
268    }
269  }
270
271  /**
272   * Receive notification that the Context had completed.
273   * Remove context from the list of active context.
274   */
275  final class ClosedContextHandler implements EventHandler<ClosedContext> {
276    @Override
277    public void onNext(final ClosedContext context) {
278      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
279      synchronized (JobDriver.this) {
280        JobDriver.this.contexts.remove(context.getId());
281      }
282    }
283  }
284
285  /**
286   * Receive notification that the Context had failed.
287   * Remove context from the list of active context and notify the client.
288   */
289  final class FailedContextHandler implements EventHandler<FailedContext> {
290    @Override
291    public void onNext(final FailedContext context) {
292      LOG.log(Level.SEVERE, "FailedContext", context);
293      synchronized (JobDriver.this) {
294        JobDriver.this.contexts.remove(context.getId());
295      }
296      throw new RuntimeException("Failed context: ", context.asError());
297    }
298  }
299
300  /**
301   * Receive notification that the Task has completed successfully.
302   */
303  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
304    @Override
305    public void onNext(final CompletedTask task) {
306      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
307      // Take the message returned by the task and add it to the running result.
308      final String result = CODEC.decode(task.get());
309      synchronized (JobDriver.this) {
310        JobDriver.this.results.add(task.getId() + " :: " + result);
311        LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
312            task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state});
313        if (--JobDriver.this.expectCount <= 0) {
314          JobDriver.this.returnResults();
315          JobDriver.this.state = State.READY;
316          if (JobDriver.this.cmd != null) {
317            JobDriver.this.submit(JobDriver.this.cmd);
318          }
319        }
320      }
321    }
322  }
323
324  /**
325   * Receive notification from the client.
326   */
327  final class ClientMessageHandler implements EventHandler<byte[]> {
328    @Override
329    public void onNext(final byte[] message) {
330      synchronized (JobDriver.this) {
331        final String command = CODEC.decode(message);
332        LOG.log(Level.INFO, "Client message: {0} state: {1}",
333            new Object[]{command, JobDriver.this.state});
334        assert (JobDriver.this.cmd == null);
335        if (JobDriver.this.state == State.READY) {
336          JobDriver.this.submit(command);
337        } else {
338          // not ready yet - save the command for better times.
339          assert (JobDriver.this.state == State.WAIT_EVALUATORS);
340          JobDriver.this.cmd = command;
341        }
342      }
343    }
344  }
345
346  /**
347   * Job Driver is ready and the clock is set up: request the evaluators.
348   */
349  final class StartHandler implements EventHandler<StartTime> {
350    @Override
351    public void onNext(final StartTime startTime) {
352      LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
353      assert (state == State.INIT);
354      requestEvaluators();
355    }
356  }
357
358  /**
359   * Shutting down the job driver: close the evaluators.
360   */
361  final class StopHandler implements EventHandler<StopTime> {
362    @Override
363    public void onNext(final StopTime time) {
364      LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
365      for (final ActiveContext context : contexts.values()) {
366        context.close();
367      }
368    }
369  }
370}