001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.scheduler.driver; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequestor; 025import org.apache.reef.driver.task.CompletedTask; 026import org.apache.reef.examples.scheduler.client.SchedulerREEF; 027import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException; 028import org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException; 029import org.apache.reef.tang.annotations.Parameter; 030import org.apache.reef.tang.annotations.Unit; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 033import org.apache.reef.wake.time.event.StartTime; 034 035import javax.annotation.concurrent.GuardedBy; 036import javax.inject.Inject; 037import java.util.List; 038import java.util.Map; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042/** 043 * Driver for TaskScheduler. It receives the commands by HttpRequest and 044 * execute them in a FIFO(First In First Out) order. 045 */ 046@Unit 047public final class SchedulerDriver { 048 049 public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 050 private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName()); 051 052 /** 053 * Possible states of the job driver. Can be one of: 054 * <dl> 055 * <dt><code>INIT</code></dt><dd>Initial state. Ready to request an evaluator.</dd> 056 * <dt><code>WAIT_EVALUATORS</code></dt><dd>Waiting for an evaluator allocated with no active evaluators.</dd> 057 * <dt><code>READY</code></dt><dd>Wait for the commands. Reactivated when a new Task arrives.</dd> 058 * <dt><code>RUNNING</code></dt><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd> 059 * </dl> 060 */ 061 private enum State { 062 INIT, WAIT_EVALUATORS, READY, RUNNING 063 } 064 065 /** 066 * If true, it reuses evaluators when Tasks done. 067 */ 068 private boolean retainable; 069 070 @GuardedBy("SchedulerDriver.this") 071 private State state = State.INIT; 072 073 @GuardedBy("SchedulerDriver.this") 074 private Scheduler scheduler; 075 076 @GuardedBy("SchedulerDriver.this") 077 private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0; 078 079 private final EvaluatorRequestor requestor; 080 081 @Inject 082 private SchedulerDriver(final EvaluatorRequestor requestor, 083 @Parameter(SchedulerREEF.Retain.class) final boolean retainable, 084 final Scheduler scheduler) { 085 this.requestor = requestor; 086 this.scheduler = scheduler; 087 this.retainable = retainable; 088 } 089 090 /** 091 * The driver is ready to run. 092 */ 093 public final class StartHandler implements EventHandler<StartTime> { 094 @Override 095 public void onNext(final StartTime startTime) { 096 synchronized (SchedulerDriver.this) { 097 LOG.log(Level.INFO, "Driver started at {0}", startTime); 098 assert state == State.INIT; 099 state = State.WAIT_EVALUATORS; 100 101 requestEvaluator(1); // Allocate an initial evaluator to avoid idle state. 102 } 103 } 104 } 105 106 /** 107 * Evaluator is allocated. This occurs every time to run commands in Non-retainable version, 108 * while occurs only once in the Retainable version 109 */ 110 public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 111 @Override 112 public void onNext(final AllocatedEvaluator evaluator) { 113 LOG.log(Level.INFO, "Evaluator is ready"); 114 synchronized (SchedulerDriver.this) { 115 nActiveEval++; 116 nRequestedEval--; 117 } 118 119 evaluator.submitContext(ContextConfiguration.CONF 120 .set(ContextConfiguration.IDENTIFIER, "SchedulerContext") 121 .build()); 122 } 123 } 124 125 /** 126 * Now it is ready to schedule tasks. But if the queue is empty, 127 * wait until commands coming up. 128 * 129 * If there is no pending task, having more than 1 evaluators must be redundant. 130 * It may happen, for example, when tasks are canceled during allocation. 131 * In these cases, the new evaluator may be abandoned. 132 */ 133 public final class ActiveContextHandler implements EventHandler<ActiveContext> { 134 @Override 135 public void onNext(final ActiveContext context) { 136 synchronized (SchedulerDriver.this) { 137 LOG.log(Level.INFO, "Context available : {0}", context.getId()); 138 139 if (scheduler.hasPendingTasks()) { 140 state = State.RUNNING; 141 scheduler.submitTask(context); 142 } else if (nActiveEval > 1) { 143 nActiveEval--; 144 context.close(); 145 } else { 146 state = State.READY; 147 waitForCommands(context); 148 } 149 } 150 } 151 } 152 153 /** 154 * When a Task completes, the task is marked as finished. 155 * The evaluator is reused for the next Task if retainable is set to {@code true}. 156 * Otherwise the evaluator is released. 157 */ 158 public final class CompletedTaskHandler implements EventHandler<CompletedTask> { 159 @Override 160 public void onNext(final CompletedTask task) { 161 final int taskId = Integer.parseInt(task.getId()); 162 163 synchronized (SchedulerDriver.this) { 164 scheduler.setFinished(taskId); 165 166 LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable)); 167 final ActiveContext context = task.getActiveContext(); 168 169 if (retainable) { 170 retainEvaluator(context); 171 } else { 172 reallocateEvaluator(context); 173 } 174 } 175 } 176 } 177 178 179 /** 180 * Get the list of tasks in the scheduler. 181 */ 182 public synchronized Map<String, List<Integer>> getList() { 183 return scheduler.getList(); 184 } 185 186 /** 187 * Clear all the Tasks from the waiting queue. 188 */ 189 public synchronized int clearList() { 190 return scheduler.clear(); 191 } 192 193 /** 194 * Get the status of a task. 195 */ 196 public synchronized String getTaskStatus(final int taskId) throws NotFoundException { 197 return scheduler.getTaskStatus(taskId); 198 } 199 200 /** 201 * Cancel a Task waiting on the queue. A task cannot be canceled 202 * once it is running. 203 */ 204 public synchronized int cancelTask(final int taskId) throws NotFoundException, UnsuccessfulException { 205 return scheduler.cancelTask(taskId); 206 } 207 208 /** 209 * Submit a command to schedule. 210 */ 211 public synchronized int submitCommand(final String command) { 212 final Integer id = scheduler.assignTaskId(); 213 scheduler.addTask(new TaskEntity(id, command)); 214 215 if (state == State.READY) { 216 notify(); // Wake up at {waitForCommands} 217 } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) { 218 requestEvaluator(1); 219 } 220 return id; 221 } 222 223 /** 224 * Update the maximum number of evaluators to hold. 225 * Request more evaluators in case there are pending tasks 226 * in the queue and the number of evaluators is less than the limit. 227 */ 228 public synchronized int setMaxEvaluators(final int targetNum) throws UnsuccessfulException { 229 if (targetNum < nActiveEval + nRequestedEval) { 230 throw new UnsuccessfulException(nActiveEval + nRequestedEval + 231 " evaluators are used now. Should be larger than that."); 232 } 233 nMaxEval = targetNum; 234 235 if (scheduler.hasPendingTasks()) { 236 final int nToRequest = 237 Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval; 238 requestEvaluator(nToRequest); 239 } 240 return nMaxEval; 241 } 242 243 /** 244 * Request evaluators. Passing a non positive number is illegal, 245 * so it does not make a trial for that situation. 246 */ 247 private synchronized void requestEvaluator(final int numToRequest) { 248 if (numToRequest <= 0) { 249 throw new IllegalArgumentException("The number of evaluator request should be a positive integer"); 250 } 251 252 nRequestedEval += numToRequest; 253 requestor.newRequest() 254 .setMemory(32) 255 .setNumber(numToRequest) 256 .submit(); 257 } 258 259 /** 260 * Pick up a command from the queue and run it. Wait until 261 * any command coming up if no command exists. 262 */ 263 private synchronized void waitForCommands(final ActiveContext context) { 264 while (!scheduler.hasPendingTasks()) { 265 // Wait until any command enters in the queue 266 try { 267 wait(); 268 } catch (final InterruptedException e) { 269 LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e); 270 } 271 } 272 // When wakes up, run the first command from the queue. 273 state = State.RUNNING; 274 scheduler.submitTask(context); 275 } 276 277 /** 278 * Retain the complete evaluators submitting another task 279 * until there is no need to reuse them. 280 */ 281 private synchronized void retainEvaluator(final ActiveContext context) { 282 if (scheduler.hasPendingTasks()) { 283 scheduler.submitTask(context); 284 } else if (nActiveEval > 1) { 285 nActiveEval--; 286 context.close(); 287 } else { 288 state = State.READY; 289 waitForCommands(context); 290 } 291 } 292 293 /** 294 * Always close the complete evaluators and 295 * allocate a new evaluator if necessary. 296 */ 297 private synchronized void reallocateEvaluator(final ActiveContext context) { 298 nActiveEval--; 299 context.close(); 300 301 if (scheduler.hasPendingTasks()) { 302 requestEvaluator(1); 303 } else if (nActiveEval <= 0) { 304 state = State.WAIT_EVALUATORS; 305 requestEvaluator(1); 306 } 307 } 308}