001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.scheduler; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.task.CompletedTask; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.tang.annotations.Unit; 029import org.apache.reef.wake.EventHandler; 030import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 031import org.apache.reef.wake.time.event.StartTime; 032 033import javax.annotation.concurrent.GuardedBy; 034import javax.inject.Inject; 035import java.util.List; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039/** 040 * Driver for TaskScheduler. It receives the commands by HttpRequest and 041 * execute them in a FIFO(First In First Out) order. 042 */ 043@Unit 044public final class SchedulerDriver { 045 046 public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 047 private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName()); 048 049 /** 050 * Possible states of the job driver. Can be one of: 051 * <dl> 052 * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator.</dd> 053 * <du><code>WAIT_EVALUATORS</code></du><dd>Waiting for an evaluator allocated with no active evaluators.</dd> 054 * <du><code>READY</code></du><dd>Wait for the commands. Reactivated when a new Task arrives.</dd> 055 * <du><code>RUNNING</code></du><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd> 056 * </dl> 057 */ 058 private enum State { 059 INIT, WAIT_EVALUATORS, READY, RUNNING 060 } 061 062 /** 063 * If true, it reuses evaluators when Tasks done. 064 */ 065 private boolean retainable; 066 067 @GuardedBy("SchedulerDriver.this") 068 private State state = State.INIT; 069 070 @GuardedBy("SchedulerDriver.this") 071 private Scheduler scheduler; 072 073 @GuardedBy("SchedulerDriver.this") 074 private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0; 075 076 private final EvaluatorRequestor requestor; 077 078 @Inject 079 public SchedulerDriver(final EvaluatorRequestor requestor, 080 @Parameter(SchedulerREEF.Retain.class) boolean retainable, 081 final Scheduler scheduler) { 082 this.requestor = requestor; 083 this.scheduler = scheduler; 084 this.retainable = retainable; 085 } 086 087 /** 088 * The driver is ready to run. 089 */ 090 final class StartHandler implements EventHandler<StartTime> { 091 @Override 092 public void onNext(final StartTime startTime) { 093 LOG.log(Level.INFO, "Driver started at {0}", startTime); 094 assert (state == State.INIT); 095 state = State.WAIT_EVALUATORS; 096 097 requestEvaluator(1); // Allocate an initial evaluator to avoid idle state. 098 } 099 } 100 101 /** 102 * Evaluator is allocated. This occurs every time to run commands in Non-retainable version, 103 * while occurs only once in the Retainable version 104 */ 105 final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 106 @Override 107 public void onNext(final AllocatedEvaluator evaluator) { 108 LOG.log(Level.INFO, "Evaluator is ready"); 109 synchronized (SchedulerDriver.this) { 110 nActiveEval++; 111 nRequestedEval--; 112 } 113 114 evaluator.submitContext(ContextConfiguration.CONF 115 .set(ContextConfiguration.IDENTIFIER, "SchedulerContext") 116 .build()); 117 } 118 } 119 120 /** 121 * Now it is ready to schedule tasks. But if the queue is empty, 122 * wait until commands coming up. 123 * 124 * If there is no pending task, having more than 1 evaluators must be redundant. 125 * It may happen, for example, when tasks are canceled during allocation. 126 * In these cases, the new evaluator may be abandoned. 127 */ 128 final class ActiveContextHandler implements EventHandler<ActiveContext> { 129 @Override 130 public void onNext(ActiveContext context) { 131 synchronized (SchedulerDriver.this) { 132 LOG.log(Level.INFO, "Context available : {0}", context.getId()); 133 134 if (scheduler.hasPendingTasks()) { 135 state = State.RUNNING; 136 scheduler.submitTask(context); 137 } else if (nActiveEval > 1) { 138 nActiveEval--; 139 context.close(); 140 } else { 141 state = State.READY; 142 waitForCommands(context); 143 } 144 } 145 } 146 } 147 148 /** 149 * Non-retainable version of CompletedTaskHandler. 150 * When Task completes, it closes the active context to deallocate the evaluator 151 * and if there is outstanding commands, allocate another evaluator. 152 */ 153 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 154 @Override 155 public void onNext(final CompletedTask task) { 156 final int taskId = Integer.valueOf(task.getId()); 157 158 synchronized (SchedulerDriver.this) { 159 scheduler.setFinished(taskId); 160 161 LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable)); 162 final ActiveContext context = task.getActiveContext(); 163 164 if (retainable) { 165 retainEvaluator(context); 166 } else { 167 reallocateEvaluator(context); 168 } 169 } 170 } 171 } 172 173 /** 174 * Get the list of tasks in the scheduler. 175 */ 176 public synchronized SchedulerResponse getList() { 177 return scheduler.getList(); 178 } 179 180 /** 181 * Clear all the Tasks from the waiting queue. 182 */ 183 public synchronized SchedulerResponse clearList() { 184 return scheduler.clear(); 185 } 186 187 /** 188 * Get the status of a task. 189 */ 190 public SchedulerResponse getTaskStatus(List<String> args) { 191 if (args.size() != 1) { 192 return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time"); 193 } 194 195 final Integer taskId = Integer.valueOf(args.get(0)); 196 197 synchronized (SchedulerDriver.this) { 198 return scheduler.getTaskStatus(taskId); 199 } 200 } 201 202 /** 203 * Cancel a Task waiting on the queue. A task cannot be canceled 204 * once it is running. 205 */ 206 public SchedulerResponse cancelTask(final List<String> args) { 207 if (args.size() != 1) { 208 return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time"); 209 } 210 211 final Integer taskId = Integer.valueOf(args.get(0)); 212 213 synchronized (SchedulerDriver.this) { 214 return scheduler.cancelTask(taskId); 215 } 216 } 217 218 /** 219 * Submit a command to schedule. 220 */ 221 public SchedulerResponse submitCommands(final List<String> args) { 222 if (args.size() != 1) { 223 return SchedulerResponse.BAD_REQUEST("Usage : only one command at a time"); 224 } 225 226 final String command = args.get(0); 227 final Integer id; 228 229 synchronized (SchedulerDriver.this) { 230 id = scheduler.assignTaskId(); 231 scheduler.addTask(new TaskEntity(id, command)); 232 233 if (state == State.READY) { 234 SchedulerDriver.this.notify(); // Wake up at {waitForCommands} 235 } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) { 236 requestEvaluator(1); 237 } 238 } 239 return SchedulerResponse.OK("Task ID : " + id); 240 } 241 242 /** 243 * Update the maximum number of evaluators to hold. 244 * Request more evaluators in case there are pending tasks 245 * in the queue and the number of evaluators is less than the limit. 246 */ 247 public SchedulerResponse setMaxEvaluators(final List<String> args) { 248 if (args.size() != 1) { 249 return SchedulerResponse.BAD_REQUEST("Usage : Only one value can be used"); 250 } 251 252 final int nTarget = Integer.valueOf(args.get(0)); 253 254 synchronized (SchedulerDriver.this) { 255 if (nTarget < nActiveEval + nRequestedEval) { 256 return SchedulerResponse.FORBIDDEN(nActiveEval + nRequestedEval + 257 " evaluators are used now. Should be larger than that."); 258 } 259 nMaxEval = nTarget; 260 261 if (scheduler.hasPendingTasks()) { 262 final int nToRequest = 263 Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval; 264 requestEvaluator(nToRequest); 265 } 266 return SchedulerResponse.OK("You can use evaluators up to " + nMaxEval + " evaluators."); 267 } 268 } 269 270 /** 271 * Request evaluators. Passing a non positive number is illegal, 272 * so it does not make a trial for that situation. 273 */ 274 private void requestEvaluator(final int numToRequest) { 275 if (numToRequest <= 0) { 276 throw new IllegalArgumentException("The number of evaluator request should be a positive integer"); 277 } 278 279 synchronized (SchedulerDriver.this) { 280 nRequestedEval += numToRequest; 281 requestor.submit(EvaluatorRequest.newBuilder() 282 .setMemory(32) 283 .setNumber(numToRequest) 284 .build()); 285 } 286 } 287 288 /** 289 * Pick up a command from the queue and run it. Wait until 290 * any command coming up if no command exists. 291 */ 292 private void waitForCommands(final ActiveContext context) { 293 synchronized (SchedulerDriver.this) { 294 while (!scheduler.hasPendingTasks()) { 295 // Wait until any command enters in the queue 296 try { 297 SchedulerDriver.this.wait(); 298 } catch (InterruptedException e) { 299 LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e); 300 } 301 } 302 // When wakes up, run the first command from the queue. 303 state = State.RUNNING; 304 scheduler.submitTask(context); 305 } 306 } 307 308 /** 309 * Retain the complete evaluators submitting another task 310 * until there is no need to reuse them. 311 */ 312 private synchronized void retainEvaluator(final ActiveContext context) { 313 if (scheduler.hasPendingTasks()) { 314 scheduler.submitTask(context); 315 } else if (nActiveEval > 1) { 316 nActiveEval--; 317 context.close(); 318 } else { 319 state = State.READY; 320 waitForCommands(context); 321 } 322 } 323 324 /** 325 * Always close the complete evaluators and 326 * allocate a new evaluator if necessary. 327 */ 328 private synchronized void reallocateEvaluator(final ActiveContext context) { 329 nActiveEval--; 330 context.close(); 331 332 if (scheduler.hasPendingTasks()) { 333 requestEvaluator(1); 334 } else if (nActiveEval <= 0) { 335 state = State.WAIT_EVALUATORS; 336 requestEvaluator(1); 337 } 338 } 339}