This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.scheduler;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.task.CompletedTask;
027import org.apache.reef.tang.annotations.Parameter;
028import org.apache.reef.tang.annotations.Unit;
029import org.apache.reef.wake.EventHandler;
030import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
031import org.apache.reef.wake.time.event.StartTime;
032
033import javax.annotation.concurrent.GuardedBy;
034import javax.inject.Inject;
035import java.util.List;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039/**
040 * Driver for TaskScheduler. It receives the commands by HttpRequest and
041 * execute them in a FIFO(First In First Out) order.
042 */
043@Unit
044public final class SchedulerDriver {
045
046  public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
047  private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName());
048
049  /**
050   * Possible states of the job driver. Can be one of:
051   * <dl>
052   * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator.</dd>
053   * <du><code>WAIT_EVALUATORS</code></du><dd>Waiting for an evaluator allocated with no active evaluators.</dd>
054   * <du><code>READY</code></du><dd>Wait for the commands. Reactivated when a new Task arrives.</dd>
055   * <du><code>RUNNING</code></du><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd>
056   * </dl>
057   */
058  private enum State {
059    INIT, WAIT_EVALUATORS, READY, RUNNING
060  }
061
062  /**
063   * If true, it reuses evaluators when Tasks done.
064   */
065  private boolean retainable;
066
067  @GuardedBy("SchedulerDriver.this")
068  private State state = State.INIT;
069
070  @GuardedBy("SchedulerDriver.this")
071  private Scheduler scheduler;
072
073  @GuardedBy("SchedulerDriver.this")
074  private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0;
075
076  private final EvaluatorRequestor requestor;
077
078  @Inject
079  public SchedulerDriver(final EvaluatorRequestor requestor,
080                         @Parameter(SchedulerREEF.Retain.class) boolean retainable,
081                         final Scheduler scheduler) {
082    this.requestor = requestor;
083    this.scheduler = scheduler;
084    this.retainable = retainable;
085  }
086
087  /**
088   * The driver is ready to run.
089   */
090  final class StartHandler implements EventHandler<StartTime> {
091    @Override
092    public void onNext(final StartTime startTime) {
093      LOG.log(Level.INFO, "Driver started at {0}", startTime);
094      assert (state == State.INIT);
095      state = State.WAIT_EVALUATORS;
096
097      requestEvaluator(1); // Allocate an initial evaluator to avoid idle state.
098    }
099  }
100
101  /**
102   * Evaluator is allocated. This occurs every time to run commands in Non-retainable version,
103   * while occurs only once in the Retainable version
104   */
105  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
106    @Override
107    public void onNext(final AllocatedEvaluator evaluator) {
108      LOG.log(Level.INFO, "Evaluator is ready");
109      synchronized (SchedulerDriver.this) {
110        nActiveEval++;
111        nRequestedEval--;
112      }
113
114      evaluator.submitContext(ContextConfiguration.CONF
115        .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
116        .build());
117    }
118  }
119
120  /**
121   * Now it is ready to schedule tasks. But if the queue is empty,
122   * wait until commands coming up.
123   *
124   * If there is no pending task, having more than 1 evaluators must be redundant.
125   * It may happen, for example, when tasks are canceled during allocation.
126   * In these cases, the new evaluator may be abandoned.
127   */
128  final class ActiveContextHandler implements EventHandler<ActiveContext> {
129    @Override
130    public void onNext(ActiveContext context) {
131      synchronized (SchedulerDriver.this) {
132        LOG.log(Level.INFO, "Context available : {0}", context.getId());
133
134        if (scheduler.hasPendingTasks()) {
135          state = State.RUNNING;
136          scheduler.submitTask(context);
137        } else if (nActiveEval > 1) {
138          nActiveEval--;
139          context.close();
140        } else {
141          state = State.READY;
142          waitForCommands(context);
143        }
144      }
145    }
146  }
147
148  /**
149   * Non-retainable version of CompletedTaskHandler.
150   * When Task completes, it closes the active context to deallocate the evaluator
151   * and if there is outstanding commands, allocate another evaluator.
152   */
153  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
154    @Override
155    public void onNext(final CompletedTask task) {
156      final int taskId = Integer.valueOf(task.getId());
157
158      synchronized (SchedulerDriver.this) {
159        scheduler.setFinished(taskId);
160
161        LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable));
162        final ActiveContext context = task.getActiveContext();
163
164        if (retainable) {
165          retainEvaluator(context);
166        } else {
167          reallocateEvaluator(context);
168        }
169      }
170    }
171  }
172
173  /**
174   * Get the list of tasks in the scheduler.
175   */
176  public synchronized SchedulerResponse getList() {
177    return scheduler.getList();
178  }
179
180  /**
181   * Clear all the Tasks from the waiting queue.
182   */
183  public synchronized SchedulerResponse clearList() {
184    return scheduler.clear();
185  }
186
187  /**
188   * Get the status of a task.
189   */
190  public SchedulerResponse getTaskStatus(List<String> args) {
191    if (args.size() != 1) {
192      return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
193    }
194
195    final Integer taskId = Integer.valueOf(args.get(0));
196
197    synchronized (SchedulerDriver.this) {
198      return scheduler.getTaskStatus(taskId);
199    }
200  }
201
202  /**
203   * Cancel a Task waiting on the queue. A task cannot be canceled
204   * once it is running.
205   */
206  public SchedulerResponse cancelTask(final List<String> args) {
207    if (args.size() != 1) {
208      return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
209    }
210
211    final Integer taskId = Integer.valueOf(args.get(0));
212
213    synchronized (SchedulerDriver.this) {
214      return scheduler.cancelTask(taskId);
215    }
216  }
217
218  /**
219   * Submit a command to schedule.
220   */
221  public SchedulerResponse submitCommands(final List<String> args) {
222    if (args.size() != 1) {
223      return SchedulerResponse.BAD_REQUEST("Usage : only one command at a time");
224    }
225
226    final String command = args.get(0);
227    final Integer id;
228
229    synchronized (SchedulerDriver.this) {
230      id = scheduler.assignTaskId();
231      scheduler.addTask(new TaskEntity(id, command));
232
233      if (state == State.READY) {
234        SchedulerDriver.this.notify(); // Wake up at {waitForCommands}
235      } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) {
236        requestEvaluator(1);
237      }
238    }
239    return SchedulerResponse.OK("Task ID : " + id);
240  }
241
242  /**
243   * Update the maximum number of evaluators to hold.
244   * Request more evaluators in case there are pending tasks
245   * in the queue and the number of evaluators is less than the limit.
246   */
247  public SchedulerResponse setMaxEvaluators(final List<String> args) {
248    if (args.size() != 1) {
249      return SchedulerResponse.BAD_REQUEST("Usage : Only one value can be used");
250    }
251
252    final int nTarget = Integer.valueOf(args.get(0));
253
254    synchronized (SchedulerDriver.this) {
255      if (nTarget < nActiveEval + nRequestedEval) {
256        return SchedulerResponse.FORBIDDEN(nActiveEval + nRequestedEval +
257          " evaluators are used now. Should be larger than that.");
258      }
259      nMaxEval = nTarget;
260
261      if (scheduler.hasPendingTasks()) {
262        final int nToRequest =
263          Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval;
264        requestEvaluator(nToRequest);
265      }
266      return SchedulerResponse.OK("You can use evaluators up to " + nMaxEval + " evaluators.");
267    }
268  }
269
270  /**
271   * Request evaluators. Passing a non positive number is illegal,
272   * so it does not make a trial for that situation.
273   */
274  private void requestEvaluator(final int numToRequest) {
275    if (numToRequest <= 0) {
276      throw new IllegalArgumentException("The number of evaluator request should be a positive integer");
277    }
278
279    synchronized (SchedulerDriver.this) {
280      nRequestedEval += numToRequest;
281      requestor.submit(EvaluatorRequest.newBuilder()
282        .setMemory(32)
283        .setNumber(numToRequest)
284        .build());
285    }
286  }
287
288  /**
289   * Pick up a command from the queue and run it. Wait until
290   * any command coming up if no command exists.
291   */
292  private void waitForCommands(final ActiveContext context) {
293    synchronized (SchedulerDriver.this) {
294      while (!scheduler.hasPendingTasks()) {
295        // Wait until any command enters in the queue
296        try {
297          SchedulerDriver.this.wait();
298        } catch (InterruptedException e) {
299          LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e);
300        }
301      }
302      // When wakes up, run the first command from the queue.
303      state = State.RUNNING;
304      scheduler.submitTask(context);
305    }
306  }
307
308  /**
309   * Retain the complete evaluators submitting another task
310   * until there is no need to reuse them.
311   */
312  private synchronized void retainEvaluator(final ActiveContext context) {
313    if (scheduler.hasPendingTasks()) {
314      scheduler.submitTask(context);
315    } else if (nActiveEval > 1) {
316      nActiveEval--;
317      context.close();
318    } else {
319      state = State.READY;
320      waitForCommands(context);
321    }
322  }
323
324  /**
325   * Always close the complete evaluators and
326   * allocate a new evaluator if necessary.
327   */
328  private synchronized void reallocateEvaluator(final ActiveContext context) {
329    nActiveEval--;
330    context.close();
331
332    if (scheduler.hasPendingTasks()) {
333      requestEvaluator(1);
334    } else if (nActiveEval <= 0) {
335      state = State.WAIT_EVALUATORS;
336      requestEvaluator(1);
337    }
338  }
339}