This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.pool;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.CompletedEvaluator;
025import org.apache.reef.driver.evaluator.EvaluatorRequest;
026import org.apache.reef.driver.evaluator.EvaluatorRequestor;
027import org.apache.reef.driver.task.CompletedTask;
028import org.apache.reef.driver.task.RunningTask;
029import org.apache.reef.driver.task.TaskConfiguration;
030import org.apache.reef.tang.Configuration;
031import org.apache.reef.tang.JavaConfigurationBuilder;
032import org.apache.reef.tang.Tang;
033import org.apache.reef.tang.annotations.Parameter;
034import org.apache.reef.tang.annotations.Unit;
035import org.apache.reef.tang.exceptions.BindException;
036import org.apache.reef.wake.EventHandler;
037import org.apache.reef.wake.time.event.StartTime;
038import org.apache.reef.wake.time.event.StopTime;
039
040import javax.inject.Inject;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044/**
045 * Allocate N evaluators, submit M tasks to them, and measure the time.
046 * Each task does nothing but sleeps for D seconds.
047 */
048@Unit
049public final class JobDriver {
050
051  /**
052   * Standard Java logger.
053   */
054  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
055
056  /**
057   * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
058   */
059  private final EvaluatorRequestor evaluatorRequestor;
060
061  /**
062   * If true, submit context and task in one request.
063   */
064  private final boolean isPiggyback;
065
066  /**
067   * Number of Evaluators to request.
068   */
069  private final int numEvaluators;
070
071  /**
072   * Number of Tasks to run.
073   */
074  private final int numTasks;
075  /**
076   * Number of seconds to sleep in each Task.
077   * (has to be a String to pass it into Task config).
078   */
079  private final String delayStr;
080  /**
081   * Number of Evaluators started.
082   */
083  private int numEvaluatorsStarted = 0;
084  /**
085   * Number of Tasks launched.
086   */
087  private int numTasksStarted = 0;
088
089  /**
090   * Job driver constructor.
091   * All parameters are injected from TANG automatically.
092   *
093   * @param evaluatorRequestor is used to request Evaluators.
094   */
095  @Inject
096  JobDriver(final EvaluatorRequestor evaluatorRequestor,
097            final @Parameter(Launch.Piggyback.class) Boolean isPiggyback,
098            final @Parameter(Launch.NumEvaluators.class) Integer numEvaluators,
099            final @Parameter(Launch.NumTasks.class) Integer numTasks,
100            final @Parameter(Launch.Delay.class) Integer delay) {
101    this.evaluatorRequestor = evaluatorRequestor;
102    this.isPiggyback = isPiggyback;
103    this.numEvaluators = numEvaluators;
104    this.numTasks = numTasks;
105    this.delayStr = "" + delay;
106  }
107
108  /**
109   * Build a new Task configuration for a given task ID.
110   *
111   * @param taskId Unique string ID of the task
112   * @return Immutable task configuration object, ready to be submitted to REEF.
113   * @throws RuntimeException that wraps BindException if unable to build the configuration.
114   */
115  private Configuration getTaskConfiguration(final String taskId) {
116    try {
117      return TaskConfiguration.CONF
118          .set(TaskConfiguration.IDENTIFIER, taskId)
119          .set(TaskConfiguration.TASK, SleepTask.class)
120          .build();
121    } catch (final BindException ex) {
122      LOG.log(Level.SEVERE, "Failed to create  Task Configuration: " + taskId, ex);
123      throw new RuntimeException(ex);
124    }
125  }
126
127  /**
128   * Job Driver is ready and the clock is set up: request the evaluators.
129   */
130  final class StartHandler implements EventHandler<StartTime> {
131    @Override
132    public void onNext(final StartTime startTime) {
133      LOG.log(Level.INFO, "TIME: Start Driver with {0} Evaluators", numEvaluators);
134      evaluatorRequestor.submit(
135          EvaluatorRequest.newBuilder()
136              .setMemory(128)
137              .setNumberOfCores(1)
138              .setNumber(numEvaluators).build()
139      );
140    }
141  }
142
143  /**
144   * Job Driver is is shutting down: write to the log.
145   */
146  final class StopHandler implements EventHandler<StopTime> {
147    @Override
148    public void onNext(final StopTime stopTime) {
149      LOG.log(Level.INFO, "TIME: Stop Driver");
150    }
151  }
152
153  /**
154   * Receive notification that an Evaluator had been allocated,
155   * and submitTask a new Task in that Evaluator.
156   */
157  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
158    @Override
159    public void onNext(final AllocatedEvaluator eval) {
160
161      LOG.log(Level.INFO, "TIME: Allocated Evaluator {0}", eval.getId());
162
163      final boolean runTask;
164      final int nEval;
165      final int nTask;
166
167      synchronized (JobDriver.this) {
168        runTask = numTasksStarted < numTasks;
169        if (runTask) {
170          ++numEvaluatorsStarted;
171          if (isPiggyback) {
172            ++numTasksStarted;
173          }
174        }
175        nEval = numEvaluatorsStarted;
176        nTask = numTasksStarted;
177      }
178
179      if (runTask) {
180
181        final String contextId = String.format("Context_%06d", nEval);
182        LOG.log(Level.INFO, "TIME: Submit Context {0} to Evaluator {1}",
183            new Object[]{contextId, eval.getId()});
184
185        try {
186
187          final JavaConfigurationBuilder contextConfigBuilder =
188              Tang.Factory.getTang().newConfigurationBuilder();
189
190          contextConfigBuilder.addConfiguration(ContextConfiguration.CONF
191              .set(ContextConfiguration.IDENTIFIER, contextId)
192              .build());
193
194          contextConfigBuilder.bindNamedParameter(Launch.Delay.class, delayStr);
195
196          if (isPiggyback) {
197
198            final String taskId = String.format("StartTask_%08d", nTask);
199            final Configuration taskConfig = getTaskConfiguration(taskId);
200
201            LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
202                new Object[]{taskId, eval.getId()});
203
204            eval.submitContextAndTask(contextConfigBuilder.build(), taskConfig);
205
206          } else {
207            eval.submitContext(contextConfigBuilder.build());
208          }
209
210        } catch (final BindException ex) {
211          LOG.log(Level.SEVERE, "Failed to submit Context to Evaluator: " + eval.getId(), ex);
212          throw new RuntimeException(ex);
213        }
214      } else {
215        LOG.log(Level.INFO, "TIME: Close Evaluator {0}", eval.getId());
216        eval.close();
217      }
218    }
219  }
220
221  /**
222   * Receive notification that the Context is active.
223   */
224  final class ActiveContextHandler implements EventHandler<ActiveContext> {
225    @Override
226    public void onNext(final ActiveContext context) {
227
228      LOG.log(Level.INFO, "TIME: Active Context {0}", context.getId());
229
230      if (isPiggyback) return; // Task already submitted
231
232      final boolean runTask;
233      final int nTask;
234
235      synchronized (JobDriver.this) {
236        runTask = numTasksStarted < numTasks;
237        if (runTask) {
238          ++numTasksStarted;
239        }
240        nTask = numTasksStarted;
241      }
242
243      if (runTask) {
244        final String taskId = String.format("StartTask_%08d", nTask);
245        LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
246            new Object[]{taskId, context.getEvaluatorId()});
247        context.submitTask(getTaskConfiguration(taskId));
248      } else {
249        context.close();
250      }
251    }
252  }
253
254  /**
255   * Receive notification that the Task is running.
256   */
257  final class RunningTaskHandler implements EventHandler<RunningTask> {
258    @Override
259    public void onNext(final RunningTask task) {
260      LOG.log(Level.INFO, "TIME: Running Task {0}", task.getId());
261    }
262  }
263
264  /**
265   * Receive notification that the Task has completed successfully.
266   */
267  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
268    @Override
269    public void onNext(final CompletedTask task) {
270
271      final ActiveContext context = task.getActiveContext();
272      LOG.log(Level.INFO, "TIME: Completed Task {0} on Evaluator {1}",
273          new Object[]{task.getId(), context.getEvaluatorId()});
274
275      final boolean runTask;
276      final int nTask;
277      synchronized (JobDriver.this) {
278        runTask = numTasksStarted < numTasks;
279        if (runTask) {
280          ++numTasksStarted;
281        }
282        nTask = numTasksStarted;
283      }
284
285      if (runTask) {
286        final String taskId = String.format("Task_%08d", nTask);
287        LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
288            new Object[]{taskId, context.getEvaluatorId()});
289        context.submitTask(getTaskConfiguration(taskId));
290      } else {
291        LOG.log(Level.INFO, "TIME: Close Evaluator {0}", context.getEvaluatorId());
292        context.close();
293      }
294    }
295  }
296
297  /**
298   * Receive notification that the Evaluator has been shut down.
299   */
300  final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
301    @Override
302    public void onNext(final CompletedEvaluator eval) {
303      LOG.log(Level.INFO, "TIME: Completed Evaluator {0}", eval.getId());
304    }
305  }
306}