This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.scheduler.driver;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequestor;
025import org.apache.reef.driver.task.CompletedTask;
026import org.apache.reef.examples.scheduler.client.SchedulerREEF;
027import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException;
028import org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException;
029import org.apache.reef.tang.annotations.Parameter;
030import org.apache.reef.tang.annotations.Unit;
031import org.apache.reef.wake.EventHandler;
032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
033import org.apache.reef.wake.time.event.StartTime;
034
035import javax.annotation.concurrent.GuardedBy;
036import javax.inject.Inject;
037import java.util.List;
038import java.util.Map;
039import java.util.logging.Level;
040import java.util.logging.Logger;
041
042/**
043 * Driver for TaskScheduler. It receives the commands by HttpRequest and
044 * execute them in a FIFO(First In First Out) order.
045 */
046@Unit
047public final class SchedulerDriver {
048
049  public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
050  private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName());
051
052  /**
053   * Possible states of the job driver. Can be one of:
054   * <dl>
055   * <dt><code>INIT</code></dt><dd>Initial state. Ready to request an evaluator.</dd>
056   * <dt><code>WAIT_EVALUATORS</code></dt><dd>Waiting for an evaluator allocated with no active evaluators.</dd>
057   * <dt><code>READY</code></dt><dd>Wait for the commands. Reactivated when a new Task arrives.</dd>
058   * <dt><code>RUNNING</code></dt><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd>
059   * </dl>
060   */
061  private enum State {
062    INIT, WAIT_EVALUATORS, READY, RUNNING
063  }
064
065  /**
066   * If true, it reuses evaluators when Tasks done.
067   */
068  private boolean retainable;
069
070  @GuardedBy("SchedulerDriver.this")
071  private State state = State.INIT;
072
073  @GuardedBy("SchedulerDriver.this")
074  private Scheduler scheduler;
075
076  @GuardedBy("SchedulerDriver.this")
077  private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0;
078
079  private final EvaluatorRequestor requestor;
080
081  @Inject
082  private SchedulerDriver(final EvaluatorRequestor requestor,
083                          @Parameter(SchedulerREEF.Retain.class) final boolean retainable,
084                          final Scheduler scheduler) {
085    this.requestor = requestor;
086    this.scheduler = scheduler;
087    this.retainable = retainable;
088  }
089
090  /**
091   * The driver is ready to run.
092   */
093  public final class StartHandler implements EventHandler<StartTime> {
094    @Override
095    public void onNext(final StartTime startTime) {
096      synchronized (SchedulerDriver.this) {
097        LOG.log(Level.INFO, "Driver started at {0}", startTime);
098        assert state == State.INIT;
099        state = State.WAIT_EVALUATORS;
100
101        requestEvaluator(1); // Allocate an initial evaluator to avoid idle state.
102      }
103    }
104  }
105
106  /**
107   * Evaluator is allocated. This occurs every time to run commands in Non-retainable version,
108   * while occurs only once in the Retainable version
109   */
110  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
111    @Override
112    public void onNext(final AllocatedEvaluator evaluator) {
113      LOG.log(Level.INFO, "Evaluator is ready");
114      synchronized (SchedulerDriver.this) {
115        nActiveEval++;
116        nRequestedEval--;
117      }
118
119      evaluator.submitContext(ContextConfiguration.CONF
120          .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
121          .build());
122    }
123  }
124
125  /**
126   * Now it is ready to schedule tasks. But if the queue is empty,
127   * wait until commands coming up.
128   *
129   * If there is no pending task, having more than 1 evaluators must be redundant.
130   * It may happen, for example, when tasks are canceled during allocation.
131   * In these cases, the new evaluator may be abandoned.
132   */
133  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
134    @Override
135    public void onNext(final ActiveContext context) {
136      synchronized (SchedulerDriver.this) {
137        LOG.log(Level.INFO, "Context available : {0}", context.getId());
138
139        if (scheduler.hasPendingTasks()) {
140          state = State.RUNNING;
141          scheduler.submitTask(context);
142        } else if (nActiveEval > 1) {
143          nActiveEval--;
144          context.close();
145        } else {
146          state = State.READY;
147          waitForCommands(context);
148        }
149      }
150    }
151  }
152
153  /**
154   * When a Task completes, the task is marked as finished.
155   * The evaluator is reused for the next Task if retainable is set to {@code true}.
156   * Otherwise the evaluator is released.
157   */
158  public final class CompletedTaskHandler implements EventHandler<CompletedTask> {
159    @Override
160    public void onNext(final CompletedTask task) {
161      final int taskId = Integer.parseInt(task.getId());
162
163      synchronized (SchedulerDriver.this) {
164        scheduler.setFinished(taskId);
165
166        LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable));
167        final ActiveContext context = task.getActiveContext();
168
169        if (retainable) {
170          retainEvaluator(context);
171        } else {
172          reallocateEvaluator(context);
173        }
174      }
175    }
176  }
177
178
179  /**
180   * Get the list of tasks in the scheduler.
181   */
182  public synchronized Map<String, List<Integer>> getList() {
183    return scheduler.getList();
184  }
185
186  /**
187   * Clear all the Tasks from the waiting queue.
188   */
189  public synchronized int clearList() {
190    return scheduler.clear();
191  }
192
193  /**
194   * Get the status of a task.
195   */
196  public synchronized String getTaskStatus(final int taskId) throws NotFoundException {
197    return scheduler.getTaskStatus(taskId);
198  }
199
200  /**
201   * Cancel a Task waiting on the queue. A task cannot be canceled
202   * once it is running.
203   */
204  public synchronized int cancelTask(final int taskId) throws NotFoundException, UnsuccessfulException {
205    return scheduler.cancelTask(taskId);
206  }
207
208  /**
209   * Submit a command to schedule.
210   */
211  public synchronized int submitCommand(final String command) {
212    final Integer id = scheduler.assignTaskId();
213    scheduler.addTask(new TaskEntity(id, command));
214
215    if (state == State.READY) {
216      notify(); // Wake up at {waitForCommands}
217    } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) {
218      requestEvaluator(1);
219    }
220    return id;
221  }
222
223  /**
224   * Update the maximum number of evaluators to hold.
225   * Request more evaluators in case there are pending tasks
226   * in the queue and the number of evaluators is less than the limit.
227   */
228  public synchronized int setMaxEvaluators(final int targetNum) throws UnsuccessfulException {
229    if (targetNum < nActiveEval + nRequestedEval) {
230      throw new UnsuccessfulException(nActiveEval + nRequestedEval +
231          " evaluators are used now. Should be larger than that.");
232    }
233    nMaxEval = targetNum;
234
235    if (scheduler.hasPendingTasks()) {
236      final int nToRequest =
237          Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval;
238      requestEvaluator(nToRequest);
239    }
240    return nMaxEval;
241  }
242
243  /**
244   * Request evaluators. Passing a non positive number is illegal,
245   * so it does not make a trial for that situation.
246   */
247  private synchronized void requestEvaluator(final int numToRequest) {
248    if (numToRequest <= 0) {
249      throw new IllegalArgumentException("The number of evaluator request should be a positive integer");
250    }
251
252    nRequestedEval += numToRequest;
253    requestor.newRequest()
254        .setMemory(32)
255        .setNumber(numToRequest)
256        .submit();
257  }
258
259  /**
260   * Pick up a command from the queue and run it. Wait until
261   * any command coming up if no command exists.
262   */
263  private synchronized void waitForCommands(final ActiveContext context) {
264    while (!scheduler.hasPendingTasks()) {
265      // Wait until any command enters in the queue
266      try {
267        wait();
268      } catch (final InterruptedException e) {
269        LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e);
270      }
271    }
272    // When wakes up, run the first command from the queue.
273    state = State.RUNNING;
274    scheduler.submitTask(context);
275  }
276
277  /**
278   * Retain the complete evaluators submitting another task
279   * until there is no need to reuse them.
280   */
281  private synchronized void retainEvaluator(final ActiveContext context) {
282    if (scheduler.hasPendingTasks()) {
283      scheduler.submitTask(context);
284    } else if (nActiveEval > 1) {
285      nActiveEval--;
286      context.close();
287    } else {
288      state = State.READY;
289      waitForCommands(context);
290    }
291  }
292
293  /**
294   * Always close the complete evaluators and
295   * allocate a new evaluator if necessary.
296   */
297  private synchronized void reallocateEvaluator(final ActiveContext context) {
298    nActiveEval--;
299    context.close();
300
301    if (scheduler.hasPendingTasks()) {
302      requestEvaluator(1);
303    } else if (nActiveEval <= 0) {
304      state = State.WAIT_EVALUATORS;
305      requestEvaluator(1);
306    }
307  }
308}