This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.retained_evalCLR;
020
021import org.apache.reef.driver.catalog.ResourceCatalog;
022import org.apache.reef.driver.client.JobMessageObserver;
023import org.apache.reef.driver.context.ActiveContext;
024import org.apache.reef.driver.context.ClosedContext;
025import org.apache.reef.driver.context.ContextConfiguration;
026import org.apache.reef.driver.context.FailedContext;
027import org.apache.reef.driver.evaluator.*;
028import org.apache.reef.driver.task.CompletedTask;
029import org.apache.reef.driver.task.TaskConfiguration;
030import org.apache.reef.examples.library.Command;
031import org.apache.reef.examples.library.ShellTask;
032import org.apache.reef.tang.*;
033import org.apache.reef.tang.annotations.Unit;
034import org.apache.reef.tang.exceptions.BindException;
035import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy;
036import org.apache.reef.tang.proto.ClassHierarchyProto;
037import org.apache.reef.wake.EventHandler;
038import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
039import org.apache.reef.wake.time.Clock;
040import org.apache.reef.wake.time.event.Alarm;
041import org.apache.reef.wake.time.event.StartTime;
042import org.apache.reef.wake.time.event.StopTime;
043
044import javax.inject.Inject;
045import java.io.FileInputStream;
046import java.io.IOException;
047import java.io.InputStream;
048import java.util.ArrayList;
049import java.util.HashMap;
050import java.util.List;
051import java.util.Map;
052import java.util.logging.Level;
053import java.util.logging.Logger;
054
055/**
056 * Retained Evaluator example job driver. Execute shell command on all evaluators,
057 * capture stdout, and return concatenated results back to the client.
058 */
059@Unit
060public final class JobDriver {
061  public static final String SHELL_TASK_CLASS_HIERARCHY_FILENAME = "ShellTask.bin";
062  /**
063   * Standard Java logger.
064   */
065  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
066  /**
067   * Duration of one clock interval.
068   */
069  private static final int CHECK_UP_INTERVAL = 1000; // 1 sec.
070  private static final String JVM_CONTEXT_SUFFIX = "_JVMContext";
071  private static final String CLR_CONTEXT_SUFFIX = "_CLRContext";
072  /**
073   * String codec is used to encode the results
074   * before passing them back to the client.
075   */
076  private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>();
077  public static int totalEvaluators = 2;
078  /**
079   * Wake clock is used to schedule periodical job check-ups.
080   */
081  private final Clock clock;
082  /**
083   * Job observer on the client.
084   * We use it to send results from the driver back to the client.
085   */
086  private final JobMessageObserver jobMessageObserver;
087  /**
088   * Job driver uses EvaluatorRequestor
089   * to request Evaluators that will run the Tasks.
090   */
091  private final EvaluatorRequestor evaluatorRequestor;
092  /**
093   * Static catalog of REEF resources.
094   * We use it to schedule Task on every available node.
095   */
096  private final ResourceCatalog catalog;
097  /**
098   * Shell execution results from each Evaluator.
099   */
100  private final List<String> results = new ArrayList<>();
101  /**
102   * Map from context ID to running evaluator context.
103   */
104  private final Map<String, ActiveContext> contexts = new HashMap<>();
105  private int nCLREvaluator = 1;                  // guarded by this
106  private int nJVMEvaluator = totalEvaluators - nCLREvaluator;  // guarded by this
107  /**
108   * Job driver state.
109   */
110  private State state = State.INIT;
111  /**
112   * First command to execute. Sometimes client can send us the first command
113   * before Evaluators are available; we need to store this command here.
114   */
115  private String cmd;
116  /**
117   * Number of evaluators/tasks to complete.
118   */
119  private int expectCount = 0;
120
121  /**
122   * Job driver constructor.
123   * All parameters are injected from TANG automatically.
124   *
125   * @param clock              Wake clock to schedule and check up running jobs.
126   * @param jobMessageObserver is used to send messages back to the client.
127   * @param evaluatorRequestor is used to request Evaluators.
128   */
129  @Inject
130  JobDriver(final Clock clock,
131            final JobMessageObserver jobMessageObserver,
132            final EvaluatorRequestor evaluatorRequestor,
133            final ResourceCatalog catalog) {
134    this.clock = clock;
135    this.jobMessageObserver = jobMessageObserver;
136    this.evaluatorRequestor = evaluatorRequestor;
137    this.catalog = catalog;
138  }
139
140  /**
141   * Makes a task configuration for the CLR ShellTask.
142   *
143   * @param taskId
144   * @return task configuration for the CLR Task.
145   * @throws BindException
146   */
147  private static final Configuration getCLRTaskConfiguration(
148      final String taskId, final String command) throws BindException {
149
150    final ConfigurationBuilder cb = Tang.Factory.getTang()
151        .newConfigurationBuilder(loadShellTaskClassHierarchy(SHELL_TASK_CLASS_HIERARCHY_FILENAME));
152
153    cb.bind("Microsoft.Reef.Tasks.ITask, Microsoft.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", "Microsoft.Reef.Tasks.ShellTask, Microsoft.Reef.Tasks.ShellTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null");
154    cb.bind("Microsoft.Reef.Tasks.TaskConfigurationOptions+Identifier, Microsoft.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", taskId);
155    cb.bind("Microsoft.Reef.Tasks.ShellTask+Command, Microsoft.Reef.Tasks.ShellTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null", command);
156
157    return cb.build();
158  }
159
160  /**
161   * Makes a task configuration for the JVM ShellTask..
162   *
163   * @param taskId
164   * @return task configuration for the JVM Task.
165   * @throws BindException
166   */
167  private static final Configuration getJVMTaskConfiguration(
168      final String taskId, final String command) throws BindException {
169
170    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
171    cb.addConfiguration(
172        TaskConfiguration.CONF
173            .set(TaskConfiguration.IDENTIFIER, taskId)
174            .set(TaskConfiguration.TASK, ShellTask.class)
175            .build()
176    );
177    cb.bindNamedParameter(Command.class, command);
178    return cb.build();
179  }
180
181  /**
182   * Loads the class hierarchy.
183   *
184   * @return
185   */
186  private static ClassHierarchy loadShellTaskClassHierarchy(String binFile) {
187    try (final InputStream chin = new FileInputStream(binFile)) {
188      final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin);
189      final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root);
190      return ch;
191    } catch (final IOException e) {
192      final String message = "Unable to load class hierarchy " + binFile;
193      LOG.log(Level.SEVERE, message, e);
194      throw new RuntimeException(message, e);
195    }
196  }
197
198  private void submitEvaluator(final AllocatedEvaluator eval, EvaluatorType type) {
199    synchronized (JobDriver.this) {
200
201      String contextIdSuffix = type.equals(EvaluatorType.JVM) ? JVM_CONTEXT_SUFFIX : CLR_CONTEXT_SUFFIX;
202      String contextId = eval.getId() + contextIdSuffix;
203
204      eval.setType(type);
205
206      LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
207          new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()});
208      assert (JobDriver.this.state == State.WAIT_EVALUATORS);
209      try {
210        eval.submitContext(ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, contextId).build());
211      } catch (final BindException ex) {
212        LOG.log(Level.SEVERE, "Failed to submit context " + contextId, ex);
213        throw new RuntimeException(ex);
214      }
215    }
216  }
217
218  /**
219   * Submit command to all available evaluators.
220   *
221   * @param command shell command to execute.
222   */
223  private void submit(final String command) {
224    LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
225        new Object[]{command, this.contexts.size(), this.state});
226    assert (this.state == State.READY);
227    this.expectCount = this.contexts.size();
228    this.state = State.WAIT_TASKS;
229    this.cmd = null;
230    for (final ActiveContext context : this.contexts.values()) {
231      this.submit(context, command);
232    }
233  }
234
235  /**
236   * Submit a Task that execute the command to a single Evaluator.
237   * This method is called from <code>submitTask(cmd)</code>.
238   */
239  private void submit(final ActiveContext context, final String command) {
240    try {
241      LOG.log(Level.INFO, "Sending command {0} to context: {1}", new Object[]{command, context});
242      String taskId = context.getId() + "_task";
243      final Configuration taskConfiguration;
244      if (context.getId().endsWith(JVM_CONTEXT_SUFFIX)) {
245        taskConfiguration = getJVMTaskConfiguration(taskId, command);
246      } else {
247        taskConfiguration = getCLRTaskConfiguration(taskId, command);
248      }
249      context.submitTask(taskConfiguration);
250    } catch (final BindException ex) {
251      LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
252      throw new RuntimeException(ex);
253    }
254  }
255
256  /**
257   * Construct the final result and forward it to the Client.
258   */
259  private void returnResults() {
260    final StringBuilder sb = new StringBuilder();
261    for (final String result : this.results) {
262      sb.append(result);
263    }
264    this.results.clear();
265    LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
266    this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(sb.toString()));
267  }
268
269  /**
270   * Request evaluators on each node.
271   * If nodes are not available yet, schedule another request in CHECK_UP_INTERVAL.
272   * TODO: Ask for specific nodes. (This is not working in YARN... need to check again at some point.)
273   *
274   * @throws RuntimeException if any of the requests fails.
275   */
276  private synchronized void requestEvaluators() {
277    assert (this.state == State.INIT);
278    final int numNodes = totalEvaluators;
279    if (numNodes > 0) {
280      LOG.log(Level.INFO, "Schedule on {0} nodes.", numNodes);
281      this.evaluatorRequestor.submit(
282          EvaluatorRequest.newBuilder()
283              .setMemory(128)
284              .setNumberOfCores(1)
285              .setNumber(numNodes).build()
286      );
287      this.state = State.WAIT_EVALUATORS;
288      this.expectCount = numNodes;
289    } else {
290      // No nodes available yet - wait and ask again.
291      this.clock.scheduleAlarm(CHECK_UP_INTERVAL, new EventHandler<Alarm>() {
292        @Override
293        public void onNext(final Alarm time) {
294          synchronized (JobDriver.this) {
295            LOG.log(Level.INFO, "{0} Alarm: {1}", new Object[]{JobDriver.this.state, time});
296            if (JobDriver.this.state == State.INIT) {
297              JobDriver.this.requestEvaluators();
298            }
299          }
300        }
301      });
302    }
303  }
304
305  /**
306   * Possible states of the job driver. Can be one of:
307   * <dl>
308   * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd>
309   * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd>
310   * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd>
311   * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd>
312   * </dl>
313   */
314  private enum State {
315    INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
316  }
317
318  /**
319   * Handles AllocatedEvaluator: Submit an empty context
320   */
321  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
322    @Override
323    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
324      synchronized (JobDriver.this) {
325        if (JobDriver.this.nJVMEvaluator > 0) {
326          LOG.log(Level.INFO, "===== adding JVM evaluator =====");
327          JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.JVM);
328          JobDriver.this.nJVMEvaluator -= 1;
329        } else if (JobDriver.this.nCLREvaluator > 0) {
330          LOG.log(Level.INFO, "===== adding CLR evaluator =====");
331          JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.CLR);
332          JobDriver.this.nCLREvaluator -= 1;
333        }
334      }
335    }
336  }
337
338  /**
339   * Receive notification that a new Context is available.
340   * Submit a new Distributed Shell Task to that Context.
341   */
342  final class ActiveContextHandler implements EventHandler<ActiveContext> {
343    @Override
344    public void onNext(final ActiveContext context) {
345      synchronized (JobDriver.this) {
346        LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
347            new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state});
348        assert (JobDriver.this.state == State.WAIT_EVALUATORS);
349        JobDriver.this.contexts.put(context.getId(), context);
350        if (--JobDriver.this.expectCount <= 0) {
351          JobDriver.this.state = State.READY;
352          if (JobDriver.this.cmd == null) {
353            LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
354                JobDriver.this.state);
355          } else {
356            JobDriver.this.submit(JobDriver.this.cmd);
357          }
358        }
359      }
360    }
361  }
362
363  /**
364   * Receive notification that the Context had completed.
365   * Remove context from the list of active context.
366   */
367  final class ClosedContextHandler implements EventHandler<ClosedContext> {
368    @Override
369    public void onNext(final ClosedContext context) {
370      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
371      synchronized (JobDriver.this) {
372        JobDriver.this.contexts.remove(context.getId());
373      }
374    }
375  }
376
377  /**
378   * Receive notification that the Context had failed.
379   * Remove context from the list of active context and notify the client.
380   */
381  final class FailedContextHandler implements EventHandler<FailedContext> {
382    @Override
383    public void onNext(final FailedContext context) {
384      LOG.log(Level.SEVERE, "FailedContext", context);
385      synchronized (JobDriver.this) {
386        JobDriver.this.contexts.remove(context.getId());
387      }
388      throw new RuntimeException("Failed context: ", context.asError());
389    }
390  }
391
392  /**
393   * Receive notification that the entire Evaluator had failed.
394   * Stop other jobs and pass this error to the job observer on the client.
395   */
396  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
397    @Override
398    public void onNext(final FailedEvaluator eval) {
399      synchronized (JobDriver.this) {
400        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
401        for (final FailedContext failedContext : eval.getFailedContextList()) {
402          JobDriver.this.contexts.remove(failedContext.getId());
403        }
404        throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
405      }
406    }
407  }
408
409  /**
410   * Receive notification that the Task has completed successfully.
411   */
412  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
413    @Override
414    public void onNext(final CompletedTask task) {
415      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
416      // Take the message returned by the task and add it to the running result.
417      String result = "default result";
418      try {
419        if (task.getId().contains(CLR_CONTEXT_SUFFIX)) {
420          result = new String(task.get());
421        } else {
422          result = JVM_CODEC.decode(task.get());
423        }
424      } catch (final Exception e) {
425        LOG.log(Level.WARNING, "failed to decode task outcome");
426      }
427      synchronized (JobDriver.this) {
428        JobDriver.this.results.add(task.getId() + " :: " + result);
429        LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
430            task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state});
431        if (--JobDriver.this.expectCount <= 0) {
432          JobDriver.this.returnResults();
433          JobDriver.this.state = State.READY;
434          if (JobDriver.this.cmd != null) {
435            JobDriver.this.submit(JobDriver.this.cmd);
436          }
437        }
438      }
439    }
440  }
441
442  /**
443   * Receive notification from the client.
444   */
445  final class ClientMessageHandler implements EventHandler<byte[]> {
446    @Override
447    public void onNext(final byte[] message) {
448      synchronized (JobDriver.this) {
449        final String command = JVM_CODEC.decode(message);
450        LOG.log(Level.INFO, "Client message: {0} state: {1}",
451            new Object[]{command, JobDriver.this.state});
452        assert (JobDriver.this.cmd == null);
453        if (JobDriver.this.state == State.READY) {
454          JobDriver.this.submit(command);
455        } else {
456          // not ready yet - save the command for better times.
457          assert (JobDriver.this.state == State.WAIT_EVALUATORS);
458          JobDriver.this.cmd = command;
459        }
460      }
461    }
462  }
463
464  /**
465   * Job Driver is ready and the clock is set up: request the evaluators.
466   */
467  final class StartHandler implements EventHandler<StartTime> {
468    @Override
469    public void onNext(final StartTime startTime) {
470      LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
471      assert (state == State.INIT);
472      requestEvaluators();
473    }
474  }
475
476  /**
477   * Shutting down the job driver: close the evaluators.
478   */
479  final class StopHandler implements EventHandler<StopTime> {
480    @Override
481    public void onNext(final StopTime time) {
482      LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
483      for (final ActiveContext context : contexts.values()) {
484        context.close();
485      }
486    }
487  }
488}
489