This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.hellohttp;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ClosedContext;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.context.FailedContext;
025import org.apache.reef.driver.evaluator.AllocatedEvaluator;
026import org.apache.reef.driver.evaluator.EvaluatorRequestor;
027import org.apache.reef.driver.evaluator.FailedEvaluator;
028import org.apache.reef.driver.task.CompletedTask;
029import org.apache.reef.driver.task.TaskConfiguration;
030import org.apache.reef.examples.library.Command;
031import org.apache.reef.examples.library.ShellTask;
032import org.apache.reef.tang.JavaConfigurationBuilder;
033import org.apache.reef.tang.Tang;
034import org.apache.reef.tang.annotations.Unit;
035import org.apache.reef.tang.exceptions.BindException;
036import org.apache.reef.wake.EventHandler;
037import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
038import org.apache.reef.wake.time.event.StartTime;
039import org.apache.reef.wake.time.event.StopTime;
040
041import javax.inject.Inject;
042import java.util.ArrayList;
043import java.util.HashMap;
044import java.util.List;
045import java.util.Map;
046import java.util.logging.Level;
047import java.util.logging.Logger;
048
049/**
050 * The Driver code for the Hello REEF Http Distributed Shell Application.
051 */
052@SuppressWarnings("checkstyle:hideutilityclassconstructor")
053@Unit
054public final class HttpShellJobDriver {
055  private static final Logger LOG = Logger.getLogger(HttpShellJobDriver.class.getName());
056
057  /**
058   * String codec is used to encode the results
059   * before passing them back to the client.
060   */
061  public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
062  /**
063   * Evaluator Requester.
064   */
065  private final EvaluatorRequestor evaluatorRequestor;
066  /**
067   * Number of Evaluators to request (default is 2).
068   */
069  private final int numEvaluators = 2;
070  /**
071   * Shell execution results from each Evaluator.
072   */
073  private final List<String> results = new ArrayList<>();
074  /**
075   * Map from context ID to running evaluator context.
076   */
077  private final Map<String, ActiveContext> contexts = new HashMap<>();
078  /**
079   * Job driver state.
080   */
081  private State state = State.INIT;
082  /**
083   * First command to execute. Sometimes client can send us the first command
084   * before Evaluators are available; we need to store this command here.
085   */
086  private String cmd;
087  /**
088   * Number of evaluators/tasks to complete.
089   */
090  private int expectCount = 0;
091  /**
092   * Callback handler for http return message.
093   */
094  private HttpServerShellCmdHandler.ClientCallBackHandler httpCallbackHandler;
095
096  /**
097   * Job Driver Constructor.
098   *
099   * @param requestor
100   * @param clientCallBackHandler
101   */
102  @Inject
103  public HttpShellJobDriver(final EvaluatorRequestor requestor,
104                            final HttpServerShellCmdHandler.ClientCallBackHandler clientCallBackHandler) {
105    this.evaluatorRequestor = requestor;
106    this.httpCallbackHandler = clientCallBackHandler;
107    LOG.log(Level.FINE, "Instantiated 'HttpShellJobDriver'");
108  }
109
110  /**
111   * Construct the final result and forward it to the Client.
112   */
113  private void returnResults() {
114    final StringBuilder sb = new StringBuilder();
115    for (final String result : this.results) {
116      sb.append(result);
117    }
118    this.results.clear();
119    LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
120    httpCallbackHandler.onNext(CODEC.encode(sb.toString()));
121  }
122
123  /**
124   * Submit command to all available evaluators.
125   *
126   * @param command shell command to execute.
127   */
128  private synchronized void submit(final String command) {
129    LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
130        new Object[]{command, this.contexts.size(), this.state});
131    assert this.state == State.READY;
132    this.expectCount = this.contexts.size();
133    this.state = State.WAIT_TASKS;
134    this.cmd = null;
135    for (final ActiveContext context : this.contexts.values()) {
136      this.submit(context, command);
137    }
138  }
139
140  /**
141   * Submit a Task that execute the command to a single Evaluator.
142   * This method is called from <code>submitTask(cmd)</code>.
143   */
144  private void submit(final ActiveContext context, final String command) {
145    try {
146      LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context});
147      final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
148      cb.addConfiguration(
149          TaskConfiguration.CONF
150              .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
151              .set(TaskConfiguration.TASK, ShellTask.class)
152              .build()
153      );
154      cb.bindNamedParameter(Command.class, command);
155      context.submitTask(cb.build());
156    } catch (final BindException ex) {
157      LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
158      context.close();
159      throw new RuntimeException(ex);
160    }
161  }
162
163  /**
164   * Request the evaluators.
165   */
166  private synchronized void requestEvaluators() {
167    assert this.state == State.INIT;
168    LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators);
169    this.evaluatorRequestor.newRequest()
170        .setMemory(128)
171        .setNumberOfCores(1)
172        .setNumber(this.numEvaluators)
173        .submit();
174    this.state = State.WAIT_EVALUATORS;
175    this.expectCount = this.numEvaluators;
176  }
177
178  /**
179   * Possible states of the job driver. Can be one of:
180   * <dl>
181   * <dt><code>INIT</code></dt><dd>initial state, ready to request the evaluators.</dd>
182   * <dt><code>WAIT_EVALUATORS</code></dt><dd>Wait for requested evaluators to initialize.</dd>
183   * <dt><code>READY</code></dt><dd>Ready to submitTask a new task.</dd>
184   * <dt><code>WAIT_TASKS</code></dt><dd>Wait for tasks to complete.</dd>
185   * </dl>
186   */
187  private enum State {
188    INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
189  }
190
191  /**
192   * Receive notification that an Evaluator had been allocated,
193   * and submitTask a new Task in that Evaluator.
194   */
195  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
196    @Override
197    public void onNext(final AllocatedEvaluator eval) {
198      synchronized (HttpShellJobDriver.this) {
199        LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
200            new Object[]{eval.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.contexts.size()});
201        assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS;
202        try {
203          eval.submitContext(ContextConfiguration.CONF.set(
204              ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
205        } catch (final BindException ex) {
206          LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex);
207          throw new RuntimeException(ex);
208        }
209      }
210    }
211  }
212
213  /**
214   * Receive notification that the entire Evaluator had failed.
215   * Stop other jobs and pass this error to the job observer on the client.
216   */
217  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
218    @Override
219    public void onNext(final FailedEvaluator eval) {
220      synchronized (HttpShellJobDriver.this) {
221        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
222        for (final FailedContext failedContext : eval.getFailedContextList()) {
223          HttpShellJobDriver.this.contexts.remove(failedContext.getId());
224        }
225        throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
226      }
227    }
228  }
229
230  /**
231   * Receive notification that a new Context is available.
232   * Submit a new Distributed Shell Task to that Context.
233   */
234  final class ActiveContextHandler implements EventHandler<ActiveContext> {
235    @Override
236    public void onNext(final ActiveContext context) {
237      synchronized (HttpShellJobDriver.this) {
238        LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
239            new Object[]{context.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.state});
240        assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS;
241        HttpShellJobDriver.this.contexts.put(context.getId(), context);
242        if (--HttpShellJobDriver.this.expectCount <= 0) {
243          HttpShellJobDriver.this.state = State.READY;
244          if (HttpShellJobDriver.this.cmd == null) {
245            LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
246                HttpShellJobDriver.this.state);
247          } else {
248            HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd);
249          }
250        }
251      }
252    }
253  }
254
255  /**
256   * Receive notification that the Context had completed.
257   * Remove context from the list of active context.
258   */
259  final class ClosedContextHandler implements EventHandler<ClosedContext> {
260    @Override
261    public void onNext(final ClosedContext context) {
262      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
263      synchronized (HttpShellJobDriver.this) {
264        HttpShellJobDriver.this.contexts.remove(context.getId());
265      }
266    }
267  }
268
269  final class HttpClientCloseHandler implements EventHandler<Void> {
270    @Override
271    public void onNext(final Void aVoid) throws RuntimeException {
272      LOG.log(Level.INFO, "Received a close message from the client. " +
273          "You can put code here to properly close drivers and evaluators.");
274      for (final ActiveContext c : contexts.values()) {
275        c.close();
276      }
277    }
278  }
279
280  /**
281   * Receive notification that the Context had failed.
282   * Remove context from the list of active context and notify the client.
283   */
284  final class FailedContextHandler implements EventHandler<FailedContext> {
285    @Override
286    public void onNext(final FailedContext context) {
287      LOG.log(Level.SEVERE, "FailedContext", context);
288      synchronized (HttpShellJobDriver.this) {
289        HttpShellJobDriver.this.contexts.remove(context.getId());
290      }
291      throw new RuntimeException("Failed context: ", context.asError());
292    }
293  }
294
295  /**
296   * Receive notification that the Task has completed successfully.
297   */
298  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
299    @Override
300    public void onNext(final CompletedTask task) {
301      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
302      // Take the message returned by the task and add it to the running result.
303      final String result = CODEC.decode(task.get());
304      synchronized (HttpShellJobDriver.this) {
305        HttpShellJobDriver.this.results.add(task.getId() + " :: " + result);
306        LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
307            task.getId(), HttpShellJobDriver.this.results.size(), result, HttpShellJobDriver.this.state});
308        if (--HttpShellJobDriver.this.expectCount <= 0) {
309          HttpShellJobDriver.this.returnResults();
310          HttpShellJobDriver.this.state = State.READY;
311          if (HttpShellJobDriver.this.cmd != null) {
312            HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd);
313          }
314        }
315      }
316    }
317  }
318
319  /**
320   * Receive notification from the client.
321   */
322  final class ClientMessageHandler implements EventHandler<byte[]> {
323    @Override
324    public void onNext(final byte[] message) {
325      synchronized (HttpShellJobDriver.this) {
326        final String command = CODEC.decode(message);
327        LOG.log(Level.INFO, "Client message: {0} state: {1}",
328            new Object[]{command, HttpShellJobDriver.this.state});
329        assert HttpShellJobDriver.this.cmd == null;
330        if (HttpShellJobDriver.this.state == State.READY) {
331          HttpShellJobDriver.this.submit(command);
332        } else {
333          // not ready yet - save the command for better times.
334          assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS;
335          HttpShellJobDriver.this.cmd = command;
336        }
337      }
338    }
339  }
340
341  /**
342   * Job Driver is ready and the clock is set up: request the evaluators.
343   */
344  final class StartHandler implements EventHandler<StartTime> {
345    @Override
346    public void onNext(final StartTime startTime) {
347      synchronized (HttpShellJobDriver.this) {
348        LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
349        assert state == State.INIT;
350        requestEvaluators();
351      }
352    }
353  }
354
355  /**
356   * Shutting down the job driver: close the evaluators.
357   */
358  final class StopHandler implements EventHandler<StopTime> {
359    @Override
360    public void onNext(final StopTime time) {
361      synchronized (HttpShellJobDriver.this) {
362        LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
363        for (final ActiveContext context : contexts.values()) {
364          context.close();
365        }
366      }
367    }
368  }
369}