This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.hellohttp;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ClosedContext;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.context.FailedContext;
025import org.apache.reef.driver.evaluator.AllocatedEvaluator;
026import org.apache.reef.driver.evaluator.EvaluatorRequest;
027import org.apache.reef.driver.evaluator.EvaluatorRequestor;
028import org.apache.reef.driver.evaluator.FailedEvaluator;
029import org.apache.reef.driver.task.CompletedTask;
030import org.apache.reef.driver.task.TaskConfiguration;
031import org.apache.reef.examples.library.Command;
032import org.apache.reef.examples.library.ShellTask;
033import org.apache.reef.tang.JavaConfigurationBuilder;
034import org.apache.reef.tang.Tang;
035import org.apache.reef.tang.annotations.Unit;
036import org.apache.reef.tang.exceptions.BindException;
037import org.apache.reef.wake.EventHandler;
038import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
039import org.apache.reef.wake.time.event.StartTime;
040import org.apache.reef.wake.time.event.StopTime;
041
042import javax.inject.Inject;
043import java.util.ArrayList;
044import java.util.HashMap;
045import java.util.List;
046import java.util.Map;
047import java.util.logging.Level;
048import java.util.logging.Logger;
049
050/**
051 * The Driver code for the Hello REEF Http Distributed Shell Application.
052 */
053@SuppressWarnings("checkstyle:hideutilityclassconstructor")
054@Unit
055public final class HttpShellJobDriver {
056  private static final Logger LOG = Logger.getLogger(HttpShellJobDriver.class.getName());
057
058  /**
059   * String codec is used to encode the results
060   * before passing them back to the client.
061   */
062  public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
063  /**
064   * Evaluator Requester.
065   */
066  private final EvaluatorRequestor evaluatorRequestor;
067  /**
068   * Number of Evaluators to request (default is 2).
069   */
070  private final int numEvaluators = 2;
071  /**
072   * Shell execution results from each Evaluator.
073   */
074  private final List<String> results = new ArrayList<>();
075  /**
076   * Map from context ID to running evaluator context.
077   */
078  private final Map<String, ActiveContext> contexts = new HashMap<>();
079  /**
080   * Job driver state.
081   */
082  private State state = State.INIT;
083  /**
084   * First command to execute. Sometimes client can send us the first command
085   * before Evaluators are available; we need to store this command here.
086   */
087  private String cmd;
088  /**
089   * Number of evaluators/tasks to complete.
090   */
091  private int expectCount = 0;
092  /**
093   * Callback handler for http return message.
094   */
095  private HttpServerShellCmdHandler.ClientCallBackHandler httpCallbackHandler;
096
097  /**
098   * Job Driver Constructor.
099   *
100   * @param requestor
101   * @param clientCallBackHandler
102   */
103  @Inject
104  public HttpShellJobDriver(final EvaluatorRequestor requestor,
105                            final HttpServerShellCmdHandler.ClientCallBackHandler clientCallBackHandler) {
106    this.evaluatorRequestor = requestor;
107    this.httpCallbackHandler = clientCallBackHandler;
108    LOG.log(Level.FINE, "Instantiated 'HttpShellJobDriver'");
109  }
110
111  /**
112   * Construct the final result and forward it to the Client.
113   */
114  private void returnResults() {
115    final StringBuilder sb = new StringBuilder();
116    for (final String result : this.results) {
117      sb.append(result);
118    }
119    this.results.clear();
120    LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
121    httpCallbackHandler.onNext(CODEC.encode(sb.toString()));
122  }
123
124  /**
125   * Submit command to all available evaluators.
126   *
127   * @param command shell command to execute.
128   */
129  private synchronized void submit(final String command) {
130    LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
131        new Object[]{command, this.contexts.size(), this.state});
132    assert this.state == State.READY;
133    this.expectCount = this.contexts.size();
134    this.state = State.WAIT_TASKS;
135    this.cmd = null;
136    for (final ActiveContext context : this.contexts.values()) {
137      this.submit(context, command);
138    }
139  }
140
141  /**
142   * Submit a Task that execute the command to a single Evaluator.
143   * This method is called from <code>submitTask(cmd)</code>.
144   */
145  private void submit(final ActiveContext context, final String command) {
146    try {
147      LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context});
148      final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
149      cb.addConfiguration(
150          TaskConfiguration.CONF
151              .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
152              .set(TaskConfiguration.TASK, ShellTask.class)
153              .build()
154      );
155      cb.bindNamedParameter(Command.class, command);
156      context.submitTask(cb.build());
157    } catch (final BindException ex) {
158      LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
159      context.close();
160      throw new RuntimeException(ex);
161    }
162  }
163
164  /**
165   * Request the evaluators.
166   */
167  private synchronized void requestEvaluators() {
168    assert this.state == State.INIT;
169    LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators);
170    this.evaluatorRequestor.submit(
171        EvaluatorRequest.newBuilder()
172            .setMemory(128)
173            .setNumberOfCores(1)
174            .setNumber(this.numEvaluators).build()
175    );
176    this.state = State.WAIT_EVALUATORS;
177    this.expectCount = this.numEvaluators;
178  }
179
180  /**
181   * Possible states of the job driver. Can be one of:
182   * <dl>
183   * <dt><code>INIT</code></dt><dd>initial state, ready to request the evaluators.</dd>
184   * <dt><code>WAIT_EVALUATORS</code></dt><dd>Wait for requested evaluators to initialize.</dd>
185   * <dt><code>READY</code></dt><dd>Ready to submitTask a new task.</dd>
186   * <dt><code>WAIT_TASKS</code></dt><dd>Wait for tasks to complete.</dd>
187   * </dl>
188   */
189  private enum State {
190    INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
191  }
192
193  /**
194   * Receive notification that an Evaluator had been allocated,
195   * and submitTask a new Task in that Evaluator.
196   */
197  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
198    @Override
199    public void onNext(final AllocatedEvaluator eval) {
200      synchronized (HttpShellJobDriver.this) {
201        LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
202            new Object[]{eval.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.contexts.size()});
203        assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS;
204        try {
205          eval.submitContext(ContextConfiguration.CONF.set(
206              ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
207        } catch (final BindException ex) {
208          LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex);
209          throw new RuntimeException(ex);
210        }
211      }
212    }
213  }
214
215  /**
216   * Receive notification that the entire Evaluator had failed.
217   * Stop other jobs and pass this error to the job observer on the client.
218   */
219  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
220    @Override
221    public void onNext(final FailedEvaluator eval) {
222      synchronized (HttpShellJobDriver.this) {
223        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
224        for (final FailedContext failedContext : eval.getFailedContextList()) {
225          HttpShellJobDriver.this.contexts.remove(failedContext.getId());
226        }
227        throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
228      }
229    }
230  }
231
232  /**
233   * Receive notification that a new Context is available.
234   * Submit a new Distributed Shell Task to that Context.
235   */
236  final class ActiveContextHandler implements EventHandler<ActiveContext> {
237    @Override
238    public void onNext(final ActiveContext context) {
239      synchronized (HttpShellJobDriver.this) {
240        LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
241            new Object[]{context.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.state});
242        assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS;
243        HttpShellJobDriver.this.contexts.put(context.getId(), context);
244        if (--HttpShellJobDriver.this.expectCount <= 0) {
245          HttpShellJobDriver.this.state = State.READY;
246          if (HttpShellJobDriver.this.cmd == null) {
247            LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
248                HttpShellJobDriver.this.state);
249          } else {
250            HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd);
251          }
252        }
253      }
254    }
255  }
256
257  /**
258   * Receive notification that the Context had completed.
259   * Remove context from the list of active context.
260   */
261  final class ClosedContextHandler implements EventHandler<ClosedContext> {
262    @Override
263    public void onNext(final ClosedContext context) {
264      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
265      synchronized (HttpShellJobDriver.this) {
266        HttpShellJobDriver.this.contexts.remove(context.getId());
267      }
268    }
269  }
270
271  final class HttpClientCloseHandler implements EventHandler<Void> {
272    @Override
273    public void onNext(final Void aVoid) throws RuntimeException {
274      LOG.log(Level.INFO, "Received a close message from the client. " +
275          "You can put code here to properly close drivers and evaluators.");
276      for (final ActiveContext c : contexts.values()) {
277        c.close();
278      }
279    }
280  }
281
282  /**
283   * Receive notification that the Context had failed.
284   * Remove context from the list of active context and notify the client.
285   */
286  final class FailedContextHandler implements EventHandler<FailedContext> {
287    @Override
288    public void onNext(final FailedContext context) {
289      LOG.log(Level.SEVERE, "FailedContext", context);
290      synchronized (HttpShellJobDriver.this) {
291        HttpShellJobDriver.this.contexts.remove(context.getId());
292      }
293      throw new RuntimeException("Failed context: ", context.asError());
294    }
295  }
296
297  /**
298   * Receive notification that the Task has completed successfully.
299   */
300  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
301    @Override
302    public void onNext(final CompletedTask task) {
303      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
304      // Take the message returned by the task and add it to the running result.
305      final String result = CODEC.decode(task.get());
306      synchronized (HttpShellJobDriver.this) {
307        HttpShellJobDriver.this.results.add(task.getId() + " :: " + result);
308        LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
309            task.getId(), HttpShellJobDriver.this.results.size(), result, HttpShellJobDriver.this.state});
310        if (--HttpShellJobDriver.this.expectCount <= 0) {
311          HttpShellJobDriver.this.returnResults();
312          HttpShellJobDriver.this.state = State.READY;
313          if (HttpShellJobDriver.this.cmd != null) {
314            HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd);
315          }
316        }
317      }
318    }
319  }
320
321  /**
322   * Receive notification from the client.
323   */
324  final class ClientMessageHandler implements EventHandler<byte[]> {
325    @Override
326    public void onNext(final byte[] message) {
327      synchronized (HttpShellJobDriver.this) {
328        final String command = CODEC.decode(message);
329        LOG.log(Level.INFO, "Client message: {0} state: {1}",
330            new Object[]{command, HttpShellJobDriver.this.state});
331        assert HttpShellJobDriver.this.cmd == null;
332        if (HttpShellJobDriver.this.state == State.READY) {
333          HttpShellJobDriver.this.submit(command);
334        } else {
335          // not ready yet - save the command for better times.
336          assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS;
337          HttpShellJobDriver.this.cmd = command;
338        }
339      }
340    }
341  }
342
343  /**
344   * Job Driver is ready and the clock is set up: request the evaluators.
345   */
346  final class StartHandler implements EventHandler<StartTime> {
347    @Override
348    public void onNext(final StartTime startTime) {
349      synchronized (HttpShellJobDriver.this) {
350        LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
351        assert state == State.INIT;
352        requestEvaluators();
353      }
354    }
355  }
356
357  /**
358   * Shutting down the job driver: close the evaluators.
359   */
360  final class StopHandler implements EventHandler<StopTime> {
361    @Override
362    public void onNext(final StopTime time) {
363      synchronized (HttpShellJobDriver.this) {
364        LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
365        for (final ActiveContext context : contexts.values()) {
366          context.close();
367        }
368      }
369    }
370  }
371}