This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.pool;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.CompletedEvaluator;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.task.CompletedTask;
027import org.apache.reef.driver.task.RunningTask;
028import org.apache.reef.driver.task.TaskConfiguration;
029import org.apache.reef.tang.Configuration;
030import org.apache.reef.tang.JavaConfigurationBuilder;
031import org.apache.reef.tang.Tang;
032import org.apache.reef.tang.annotations.Parameter;
033import org.apache.reef.tang.annotations.Unit;
034import org.apache.reef.tang.exceptions.BindException;
035import org.apache.reef.wake.EventHandler;
036import org.apache.reef.wake.time.event.StartTime;
037import org.apache.reef.wake.time.event.StopTime;
038
039import javax.inject.Inject;
040import java.util.logging.Level;
041import java.util.logging.Logger;
042
043/**
044 * Allocate N evaluators, submit M tasks to them, and measure the time.
045 * Each task does nothing but sleeps for D seconds.
046 */
047@Unit
048public final class JobDriver {
049
050  /**
051   * Standard Java logger.
052   */
053  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
054
055  /**
056   * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
057   */
058  private final EvaluatorRequestor evaluatorRequestor;
059
060  /**
061   * If true, submit context and task in one request.
062   */
063  private final boolean isPiggyback;
064
065  /**
066   * Number of Evaluators to request.
067   */
068  private final int numEvaluators;
069
070  /**
071   * Number of Tasks to run.
072   */
073  private final int numTasks;
074  /**
075   * Number of seconds to sleep in each Task.
076   * (has to be a String to pass it into Task config).
077   */
078  private final String delayStr;
079  /**
080   * Number of Evaluators started.
081   */
082  private int numEvaluatorsStarted = 0;
083  /**
084   * Number of Tasks launched.
085   */
086  private int numTasksStarted = 0;
087
088  /**
089   * Job driver constructor.
090   * All parameters are injected from TANG automatically.
091   *
092   * @param evaluatorRequestor is used to request Evaluators.
093   */
094  @Inject
095  JobDriver(final EvaluatorRequestor evaluatorRequestor,
096            @Parameter(Launch.Piggyback.class) final Boolean isPiggyback,
097            @Parameter(Launch.NumEvaluators.class) final Integer numEvaluators,
098            @Parameter(Launch.NumTasks.class) final Integer numTasks,
099            @Parameter(Launch.Delay.class) final Integer delay) {
100    this.evaluatorRequestor = evaluatorRequestor;
101    this.isPiggyback = isPiggyback;
102    this.numEvaluators = numEvaluators;
103    this.numTasks = numTasks;
104    this.delayStr = "" + delay;
105  }
106
107  /**
108   * Build a new Task configuration for a given task ID.
109   *
110   * @param taskId Unique string ID of the task
111   * @return Immutable task configuration object, ready to be submitted to REEF.
112   * @throws RuntimeException that wraps BindException if unable to build the configuration.
113   */
114  private Configuration getTaskConfiguration(final String taskId) {
115    try {
116      return TaskConfiguration.CONF
117          .set(TaskConfiguration.IDENTIFIER, taskId)
118          .set(TaskConfiguration.TASK, SleepTask.class)
119          .build();
120    } catch (final BindException ex) {
121      LOG.log(Level.SEVERE, "Failed to create  Task Configuration: " + taskId, ex);
122      throw new RuntimeException(ex);
123    }
124  }
125
126  /**
127   * Job Driver is ready and the clock is set up: request the evaluators.
128   */
129  final class StartHandler implements EventHandler<StartTime> {
130    @Override
131    public void onNext(final StartTime startTime) {
132      LOG.log(Level.INFO, "TIME: Start Driver with {0} Evaluators", numEvaluators);
133      evaluatorRequestor.newRequest()
134          .setMemory(128)
135          .setNumberOfCores(1)
136          .setNumber(numEvaluators)
137          .submit();
138    }
139  }
140
141  /**
142   * Job Driver is is shutting down: write to the log.
143   */
144  final class StopHandler implements EventHandler<StopTime> {
145    @Override
146    public void onNext(final StopTime stopTime) {
147      LOG.log(Level.INFO, "TIME: Stop Driver");
148    }
149  }
150
151  /**
152   * Receive notification that an Evaluator had been allocated,
153   * and submitTask a new Task in that Evaluator.
154   */
155  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
156    @Override
157    public void onNext(final AllocatedEvaluator eval) {
158
159      LOG.log(Level.INFO, "TIME: Allocated Evaluator {0}", eval.getId());
160
161      final boolean runTask;
162      final int nEval;
163      final int nTask;
164
165      synchronized (JobDriver.this) {
166        runTask = numTasksStarted < numTasks;
167        if (runTask) {
168          ++numEvaluatorsStarted;
169          if (isPiggyback) {
170            ++numTasksStarted;
171          }
172        }
173        nEval = numEvaluatorsStarted;
174        nTask = numTasksStarted;
175      }
176
177      if (runTask) {
178
179        final String contextId = String.format("Context_%06d", nEval);
180        LOG.log(Level.INFO, "TIME: Submit Context {0} to Evaluator {1}",
181            new Object[]{contextId, eval.getId()});
182
183        try {
184
185          final JavaConfigurationBuilder contextConfigBuilder =
186              Tang.Factory.getTang().newConfigurationBuilder();
187
188          contextConfigBuilder.addConfiguration(ContextConfiguration.CONF
189              .set(ContextConfiguration.IDENTIFIER, contextId)
190              .build());
191
192          contextConfigBuilder.bindNamedParameter(Launch.Delay.class, delayStr);
193
194          if (isPiggyback) {
195
196            final String taskId = String.format("StartTask_%08d", nTask);
197            final Configuration taskConfig = getTaskConfiguration(taskId);
198
199            LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
200                new Object[]{taskId, eval.getId()});
201
202            eval.submitContextAndTask(contextConfigBuilder.build(), taskConfig);
203
204          } else {
205            eval.submitContext(contextConfigBuilder.build());
206          }
207
208        } catch (final BindException ex) {
209          LOG.log(Level.SEVERE, "Failed to submit Context to Evaluator: " + eval.getId(), ex);
210          throw new RuntimeException(ex);
211        }
212      } else {
213        LOG.log(Level.INFO, "TIME: Close Evaluator {0}", eval.getId());
214        eval.close();
215      }
216    }
217  }
218
219  /**
220   * Receive notification that the Context is active.
221   */
222  final class ActiveContextHandler implements EventHandler<ActiveContext> {
223    @Override
224    public void onNext(final ActiveContext context) {
225
226      LOG.log(Level.INFO, "TIME: Active Context {0}", context.getId());
227
228      if (isPiggyback) {
229        return; // Task already submitted
230      }
231
232      final boolean runTask;
233      final int nTask;
234
235      synchronized (JobDriver.this) {
236        runTask = numTasksStarted < numTasks;
237        if (runTask) {
238          ++numTasksStarted;
239        }
240        nTask = numTasksStarted;
241      }
242
243      if (runTask) {
244        final String taskId = String.format("StartTask_%08d", nTask);
245        LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
246            new Object[]{taskId, context.getEvaluatorId()});
247        context.submitTask(getTaskConfiguration(taskId));
248      } else {
249        context.close();
250      }
251    }
252  }
253
254  /**
255   * Receive notification that the Task is running.
256   */
257  final class RunningTaskHandler implements EventHandler<RunningTask> {
258    @Override
259    public void onNext(final RunningTask task) {
260      LOG.log(Level.INFO, "TIME: Running Task {0}", task.getId());
261    }
262  }
263
264  /**
265   * Receive notification that the Task has completed successfully.
266   */
267  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
268    @Override
269    public void onNext(final CompletedTask task) {
270
271      final ActiveContext context = task.getActiveContext();
272      LOG.log(Level.INFO, "TIME: Completed Task {0} on Evaluator {1}",
273          new Object[]{task.getId(), context.getEvaluatorId()});
274
275      final boolean runTask;
276      final int nTask;
277      synchronized (JobDriver.this) {
278        runTask = numTasksStarted < numTasks;
279        if (runTask) {
280          ++numTasksStarted;
281        }
282        nTask = numTasksStarted;
283      }
284
285      if (runTask) {
286        final String taskId = String.format("Task_%08d", nTask);
287        LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
288            new Object[]{taskId, context.getEvaluatorId()});
289        context.submitTask(getTaskConfiguration(taskId));
290      } else {
291        LOG.log(Level.INFO, "TIME: Close Evaluator {0}", context.getEvaluatorId());
292        context.close();
293      }
294    }
295  }
296
297  /**
298   * Receive notification that the Evaluator has been shut down.
299   */
300  final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
301    @Override
302    public void onNext(final CompletedEvaluator eval) {
303      LOG.log(Level.INFO, "TIME: Completed Evaluator {0}", eval.getId());
304    }
305  }
306}