This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.pool;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.CompletedEvaluator;
025import org.apache.reef.driver.evaluator.EvaluatorRequest;
026import org.apache.reef.driver.evaluator.EvaluatorRequestor;
027import org.apache.reef.driver.task.CompletedTask;
028import org.apache.reef.driver.task.RunningTask;
029import org.apache.reef.driver.task.TaskConfiguration;
030import org.apache.reef.tang.Configuration;
031import org.apache.reef.tang.JavaConfigurationBuilder;
032import org.apache.reef.tang.Tang;
033import org.apache.reef.tang.annotations.Parameter;
034import org.apache.reef.tang.annotations.Unit;
035import org.apache.reef.tang.exceptions.BindException;
036import org.apache.reef.wake.EventHandler;
037import org.apache.reef.wake.time.event.StartTime;
038import org.apache.reef.wake.time.event.StopTime;
039
040import javax.inject.Inject;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044/**
045 * Allocate N evaluators, submit M tasks to them, and measure the time.
046 * Each task does nothing but sleeps for D seconds.
047 */
048@Unit
049public final class JobDriver {
050
051  /**
052   * Standard Java logger.
053   */
054  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
055
056  /**
057   * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
058   */
059  private final EvaluatorRequestor evaluatorRequestor;
060
061  /**
062   * If true, submit context and task in one request.
063   */
064  private final boolean isPiggyback;
065
066  /**
067   * Number of Evaluators to request.
068   */
069  private final int numEvaluators;
070
071  /**
072   * Number of Tasks to run.
073   */
074  private final int numTasks;
075  /**
076   * Number of seconds to sleep in each Task.
077   * (has to be a String to pass it into Task config).
078   */
079  private final String delayStr;
080  /**
081   * Number of Evaluators started.
082   */
083  private int numEvaluatorsStarted = 0;
084  /**
085   * Number of Tasks launched.
086   */
087  private int numTasksStarted = 0;
088
089  /**
090   * Job driver constructor.
091   * All parameters are injected from TANG automatically.
092   *
093   * @param evaluatorRequestor is used to request Evaluators.
094   */
095  @Inject
096  JobDriver(final EvaluatorRequestor evaluatorRequestor,
097            @Parameter(Launch.Piggyback.class) final Boolean isPiggyback,
098            @Parameter(Launch.NumEvaluators.class) final Integer numEvaluators,
099            @Parameter(Launch.NumTasks.class) final Integer numTasks,
100            @Parameter(Launch.Delay.class) final Integer delay) {
101    this.evaluatorRequestor = evaluatorRequestor;
102    this.isPiggyback = isPiggyback;
103    this.numEvaluators = numEvaluators;
104    this.numTasks = numTasks;
105    this.delayStr = "" + delay;
106  }
107
108  /**
109   * Build a new Task configuration for a given task ID.
110   *
111   * @param taskId Unique string ID of the task
112   * @return Immutable task configuration object, ready to be submitted to REEF.
113   * @throws RuntimeException that wraps BindException if unable to build the configuration.
114   */
115  private Configuration getTaskConfiguration(final String taskId) {
116    try {
117      return TaskConfiguration.CONF
118          .set(TaskConfiguration.IDENTIFIER, taskId)
119          .set(TaskConfiguration.TASK, SleepTask.class)
120          .build();
121    } catch (final BindException ex) {
122      LOG.log(Level.SEVERE, "Failed to create  Task Configuration: " + taskId, ex);
123      throw new RuntimeException(ex);
124    }
125  }
126
127  /**
128   * Job Driver is ready and the clock is set up: request the evaluators.
129   */
130  final class StartHandler implements EventHandler<StartTime> {
131    @Override
132    public void onNext(final StartTime startTime) {
133      LOG.log(Level.INFO, "TIME: Start Driver with {0} Evaluators", numEvaluators);
134      evaluatorRequestor.submit(
135          EvaluatorRequest.newBuilder()
136              .setMemory(128)
137              .setNumberOfCores(1)
138              .setNumber(numEvaluators).build()
139      );
140    }
141  }
142
143  /**
144   * Job Driver is is shutting down: write to the log.
145   */
146  final class StopHandler implements EventHandler<StopTime> {
147    @Override
148    public void onNext(final StopTime stopTime) {
149      LOG.log(Level.INFO, "TIME: Stop Driver");
150    }
151  }
152
153  /**
154   * Receive notification that an Evaluator had been allocated,
155   * and submitTask a new Task in that Evaluator.
156   */
157  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
158    @Override
159    public void onNext(final AllocatedEvaluator eval) {
160
161      LOG.log(Level.INFO, "TIME: Allocated Evaluator {0}", eval.getId());
162
163      final boolean runTask;
164      final int nEval;
165      final int nTask;
166
167      synchronized (JobDriver.this) {
168        runTask = numTasksStarted < numTasks;
169        if (runTask) {
170          ++numEvaluatorsStarted;
171          if (isPiggyback) {
172            ++numTasksStarted;
173          }
174        }
175        nEval = numEvaluatorsStarted;
176        nTask = numTasksStarted;
177      }
178
179      if (runTask) {
180
181        final String contextId = String.format("Context_%06d", nEval);
182        LOG.log(Level.INFO, "TIME: Submit Context {0} to Evaluator {1}",
183            new Object[]{contextId, eval.getId()});
184
185        try {
186
187          final JavaConfigurationBuilder contextConfigBuilder =
188              Tang.Factory.getTang().newConfigurationBuilder();
189
190          contextConfigBuilder.addConfiguration(ContextConfiguration.CONF
191              .set(ContextConfiguration.IDENTIFIER, contextId)
192              .build());
193
194          contextConfigBuilder.bindNamedParameter(Launch.Delay.class, delayStr);
195
196          if (isPiggyback) {
197
198            final String taskId = String.format("StartTask_%08d", nTask);
199            final Configuration taskConfig = getTaskConfiguration(taskId);
200
201            LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
202                new Object[]{taskId, eval.getId()});
203
204            eval.submitContextAndTask(contextConfigBuilder.build(), taskConfig);
205
206          } else {
207            eval.submitContext(contextConfigBuilder.build());
208          }
209
210        } catch (final BindException ex) {
211          LOG.log(Level.SEVERE, "Failed to submit Context to Evaluator: " + eval.getId(), ex);
212          throw new RuntimeException(ex);
213        }
214      } else {
215        LOG.log(Level.INFO, "TIME: Close Evaluator {0}", eval.getId());
216        eval.close();
217      }
218    }
219  }
220
221  /**
222   * Receive notification that the Context is active.
223   */
224  final class ActiveContextHandler implements EventHandler<ActiveContext> {
225    @Override
226    public void onNext(final ActiveContext context) {
227
228      LOG.log(Level.INFO, "TIME: Active Context {0}", context.getId());
229
230      if (isPiggyback) {
231        return; // Task already submitted
232      }
233
234      final boolean runTask;
235      final int nTask;
236
237      synchronized (JobDriver.this) {
238        runTask = numTasksStarted < numTasks;
239        if (runTask) {
240          ++numTasksStarted;
241        }
242        nTask = numTasksStarted;
243      }
244
245      if (runTask) {
246        final String taskId = String.format("StartTask_%08d", nTask);
247        LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
248            new Object[]{taskId, context.getEvaluatorId()});
249        context.submitTask(getTaskConfiguration(taskId));
250      } else {
251        context.close();
252      }
253    }
254  }
255
256  /**
257   * Receive notification that the Task is running.
258   */
259  final class RunningTaskHandler implements EventHandler<RunningTask> {
260    @Override
261    public void onNext(final RunningTask task) {
262      LOG.log(Level.INFO, "TIME: Running Task {0}", task.getId());
263    }
264  }
265
266  /**
267   * Receive notification that the Task has completed successfully.
268   */
269  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
270    @Override
271    public void onNext(final CompletedTask task) {
272
273      final ActiveContext context = task.getActiveContext();
274      LOG.log(Level.INFO, "TIME: Completed Task {0} on Evaluator {1}",
275          new Object[]{task.getId(), context.getEvaluatorId()});
276
277      final boolean runTask;
278      final int nTask;
279      synchronized (JobDriver.this) {
280        runTask = numTasksStarted < numTasks;
281        if (runTask) {
282          ++numTasksStarted;
283        }
284        nTask = numTasksStarted;
285      }
286
287      if (runTask) {
288        final String taskId = String.format("Task_%08d", nTask);
289        LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
290            new Object[]{taskId, context.getEvaluatorId()});
291        context.submitTask(getTaskConfiguration(taskId));
292      } else {
293        LOG.log(Level.INFO, "TIME: Close Evaluator {0}", context.getEvaluatorId());
294        context.close();
295      }
296    }
297  }
298
299  /**
300   * Receive notification that the Evaluator has been shut down.
301   */
302  final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
303    @Override
304    public void onNext(final CompletedEvaluator eval) {
305      LOG.log(Level.INFO, "TIME: Completed Evaluator {0}", eval.getId());
306    }
307  }
308}