This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.scheduler.driver;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.task.CompletedTask;
027import org.apache.reef.examples.scheduler.client.SchedulerREEF;
028import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException;
029import org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException;
030import org.apache.reef.tang.annotations.Parameter;
031import org.apache.reef.tang.annotations.Unit;
032import org.apache.reef.wake.EventHandler;
033import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
034import org.apache.reef.wake.time.event.StartTime;
035
036import javax.annotation.concurrent.GuardedBy;
037import javax.inject.Inject;
038import java.util.List;
039import java.util.Map;
040import java.util.logging.Level;
041import java.util.logging.Logger;
042
043/**
044 * Driver for TaskScheduler. It receives the commands by HttpRequest and
045 * execute them in a FIFO(First In First Out) order.
046 */
047@Unit
048public final class SchedulerDriver {
049
050  public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
051  private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName());
052
053  /**
054   * Possible states of the job driver. Can be one of:
055   * <dl>
056   * <dt><code>INIT</code></dt><dd>Initial state. Ready to request an evaluator.</dd>
057   * <dt><code>WAIT_EVALUATORS</code></dt><dd>Waiting for an evaluator allocated with no active evaluators.</dd>
058   * <dt><code>READY</code></dt><dd>Wait for the commands. Reactivated when a new Task arrives.</dd>
059   * <dt><code>RUNNING</code></dt><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd>
060   * </dl>
061   */
062  private enum State {
063    INIT, WAIT_EVALUATORS, READY, RUNNING
064  }
065
066  /**
067   * If true, it reuses evaluators when Tasks done.
068   */
069  private boolean retainable;
070
071  @GuardedBy("SchedulerDriver.this")
072  private State state = State.INIT;
073
074  @GuardedBy("SchedulerDriver.this")
075  private Scheduler scheduler;
076
077  @GuardedBy("SchedulerDriver.this")
078  private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0;
079
080  private final EvaluatorRequestor requestor;
081
082  @Inject
083  private SchedulerDriver(final EvaluatorRequestor requestor,
084                          @Parameter(SchedulerREEF.Retain.class) final boolean retainable,
085                          final Scheduler scheduler) {
086    this.requestor = requestor;
087    this.scheduler = scheduler;
088    this.retainable = retainable;
089  }
090
091  /**
092   * The driver is ready to run.
093   */
094  public final class StartHandler implements EventHandler<StartTime> {
095    @Override
096    public void onNext(final StartTime startTime) {
097      synchronized (SchedulerDriver.this) {
098        LOG.log(Level.INFO, "Driver started at {0}", startTime);
099        assert state == State.INIT;
100        state = State.WAIT_EVALUATORS;
101
102        requestEvaluator(1); // Allocate an initial evaluator to avoid idle state.
103      }
104    }
105  }
106
107  /**
108   * Evaluator is allocated. This occurs every time to run commands in Non-retainable version,
109   * while occurs only once in the Retainable version
110   */
111  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
112    @Override
113    public void onNext(final AllocatedEvaluator evaluator) {
114      LOG.log(Level.INFO, "Evaluator is ready");
115      synchronized (SchedulerDriver.this) {
116        nActiveEval++;
117        nRequestedEval--;
118      }
119
120      evaluator.submitContext(ContextConfiguration.CONF
121          .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
122          .build());
123    }
124  }
125
126  /**
127   * Now it is ready to schedule tasks. But if the queue is empty,
128   * wait until commands coming up.
129   *
130   * If there is no pending task, having more than 1 evaluators must be redundant.
131   * It may happen, for example, when tasks are canceled during allocation.
132   * In these cases, the new evaluator may be abandoned.
133   */
134  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
135    @Override
136    public void onNext(final ActiveContext context) {
137      synchronized (SchedulerDriver.this) {
138        LOG.log(Level.INFO, "Context available : {0}", context.getId());
139
140        if (scheduler.hasPendingTasks()) {
141          state = State.RUNNING;
142          scheduler.submitTask(context);
143        } else if (nActiveEval > 1) {
144          nActiveEval--;
145          context.close();
146        } else {
147          state = State.READY;
148          waitForCommands(context);
149        }
150      }
151    }
152  }
153
154  /**
155   * When a Task completes, the task is marked as finished.
156   * The evaluator is reused for the next Task if retainable is set to {@code true}.
157   * Otherwise the evaluator is released.
158   */
159  public final class CompletedTaskHandler implements EventHandler<CompletedTask> {
160    @Override
161    public void onNext(final CompletedTask task) {
162      final int taskId = Integer.parseInt(task.getId());
163
164      synchronized (SchedulerDriver.this) {
165        scheduler.setFinished(taskId);
166
167        LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable));
168        final ActiveContext context = task.getActiveContext();
169
170        if (retainable) {
171          retainEvaluator(context);
172        } else {
173          reallocateEvaluator(context);
174        }
175      }
176    }
177  }
178
179
180  /**
181   * Get the list of tasks in the scheduler.
182   */
183  public synchronized Map<String, List<Integer>> getList() {
184    return scheduler.getList();
185  }
186
187  /**
188   * Clear all the Tasks from the waiting queue.
189   */
190  public synchronized int clearList() {
191    return scheduler.clear();
192  }
193
194  /**
195   * Get the status of a task.
196   */
197  public synchronized String getTaskStatus(final int taskId) throws NotFoundException {
198    return scheduler.getTaskStatus(taskId);
199  }
200
201  /**
202   * Cancel a Task waiting on the queue. A task cannot be canceled
203   * once it is running.
204   */
205  public synchronized int cancelTask(final int taskId) throws NotFoundException, UnsuccessfulException {
206    return scheduler.cancelTask(taskId);
207  }
208
209  /**
210   * Submit a command to schedule.
211   */
212  public synchronized int submitCommand(final String command) {
213    final Integer id = scheduler.assignTaskId();
214    scheduler.addTask(new TaskEntity(id, command));
215
216    if (state == State.READY) {
217      notify(); // Wake up at {waitForCommands}
218    } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) {
219      requestEvaluator(1);
220    }
221    return id;
222  }
223
224  /**
225   * Update the maximum number of evaluators to hold.
226   * Request more evaluators in case there are pending tasks
227   * in the queue and the number of evaluators is less than the limit.
228   */
229  public synchronized int setMaxEvaluators(final int targetNum) throws UnsuccessfulException {
230    if (targetNum < nActiveEval + nRequestedEval) {
231      throw new UnsuccessfulException(nActiveEval + nRequestedEval +
232          " evaluators are used now. Should be larger than that.");
233    }
234    nMaxEval = targetNum;
235
236    if (scheduler.hasPendingTasks()) {
237      final int nToRequest =
238          Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval;
239      requestEvaluator(nToRequest);
240    }
241    return nMaxEval;
242  }
243
244  /**
245   * Request evaluators. Passing a non positive number is illegal,
246   * so it does not make a trial for that situation.
247   */
248  private synchronized void requestEvaluator(final int numToRequest) {
249    if (numToRequest <= 0) {
250      throw new IllegalArgumentException("The number of evaluator request should be a positive integer");
251    }
252
253    nRequestedEval += numToRequest;
254    requestor.submit(EvaluatorRequest.newBuilder()
255        .setMemory(32)
256        .setNumber(numToRequest)
257        .build());
258  }
259
260  /**
261   * Pick up a command from the queue and run it. Wait until
262   * any command coming up if no command exists.
263   */
264  private synchronized void waitForCommands(final ActiveContext context) {
265    while (!scheduler.hasPendingTasks()) {
266      // Wait until any command enters in the queue
267      try {
268        wait();
269      } catch (final InterruptedException e) {
270        LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e);
271      }
272    }
273    // When wakes up, run the first command from the queue.
274    state = State.RUNNING;
275    scheduler.submitTask(context);
276  }
277
278  /**
279   * Retain the complete evaluators submitting another task
280   * until there is no need to reuse them.
281   */
282  private synchronized void retainEvaluator(final ActiveContext context) {
283    if (scheduler.hasPendingTasks()) {
284      scheduler.submitTask(context);
285    } else if (nActiveEval > 1) {
286      nActiveEval--;
287      context.close();
288    } else {
289      state = State.READY;
290      waitForCommands(context);
291    }
292  }
293
294  /**
295   * Always close the complete evaluators and
296   * allocate a new evaluator if necessary.
297   */
298  private synchronized void reallocateEvaluator(final ActiveContext context) {
299    nActiveEval--;
300    context.close();
301
302    if (scheduler.hasPendingTasks()) {
303      requestEvaluator(1);
304    } else if (nActiveEval <= 0) {
305      state = State.WAIT_EVALUATORS;
306      requestEvaluator(1);
307    }
308  }
309}