001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.scheduler.driver; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.task.CompletedTask; 027import org.apache.reef.examples.scheduler.client.SchedulerREEF; 028import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException; 029import org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException; 030import org.apache.reef.tang.annotations.Parameter; 031import org.apache.reef.tang.annotations.Unit; 032import org.apache.reef.wake.EventHandler; 033import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 034import org.apache.reef.wake.time.event.StartTime; 035 036import javax.annotation.concurrent.GuardedBy; 037import javax.inject.Inject; 038import java.util.List; 039import java.util.Map; 040import java.util.logging.Level; 041import java.util.logging.Logger; 042 043/** 044 * Driver for TaskScheduler. It receives the commands by HttpRequest and 045 * execute them in a FIFO(First In First Out) order. 046 */ 047@Unit 048public final class SchedulerDriver { 049 050 public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 051 private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName()); 052 053 /** 054 * Possible states of the job driver. Can be one of: 055 * <dl> 056 * <dt><code>INIT</code></dt><dd>Initial state. Ready to request an evaluator.</dd> 057 * <dt><code>WAIT_EVALUATORS</code></dt><dd>Waiting for an evaluator allocated with no active evaluators.</dd> 058 * <dt><code>READY</code></dt><dd>Wait for the commands. Reactivated when a new Task arrives.</dd> 059 * <dt><code>RUNNING</code></dt><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd> 060 * </dl> 061 */ 062 private enum State { 063 INIT, WAIT_EVALUATORS, READY, RUNNING 064 } 065 066 /** 067 * If true, it reuses evaluators when Tasks done. 068 */ 069 private boolean retainable; 070 071 @GuardedBy("SchedulerDriver.this") 072 private State state = State.INIT; 073 074 @GuardedBy("SchedulerDriver.this") 075 private Scheduler scheduler; 076 077 @GuardedBy("SchedulerDriver.this") 078 private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0; 079 080 private final EvaluatorRequestor requestor; 081 082 @Inject 083 private SchedulerDriver(final EvaluatorRequestor requestor, 084 @Parameter(SchedulerREEF.Retain.class) final boolean retainable, 085 final Scheduler scheduler) { 086 this.requestor = requestor; 087 this.scheduler = scheduler; 088 this.retainable = retainable; 089 } 090 091 /** 092 * The driver is ready to run. 093 */ 094 public final class StartHandler implements EventHandler<StartTime> { 095 @Override 096 public void onNext(final StartTime startTime) { 097 synchronized (SchedulerDriver.this) { 098 LOG.log(Level.INFO, "Driver started at {0}", startTime); 099 assert state == State.INIT; 100 state = State.WAIT_EVALUATORS; 101 102 requestEvaluator(1); // Allocate an initial evaluator to avoid idle state. 103 } 104 } 105 } 106 107 /** 108 * Evaluator is allocated. This occurs every time to run commands in Non-retainable version, 109 * while occurs only once in the Retainable version 110 */ 111 public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 112 @Override 113 public void onNext(final AllocatedEvaluator evaluator) { 114 LOG.log(Level.INFO, "Evaluator is ready"); 115 synchronized (SchedulerDriver.this) { 116 nActiveEval++; 117 nRequestedEval--; 118 } 119 120 evaluator.submitContext(ContextConfiguration.CONF 121 .set(ContextConfiguration.IDENTIFIER, "SchedulerContext") 122 .build()); 123 } 124 } 125 126 /** 127 * Now it is ready to schedule tasks. But if the queue is empty, 128 * wait until commands coming up. 129 * 130 * If there is no pending task, having more than 1 evaluators must be redundant. 131 * It may happen, for example, when tasks are canceled during allocation. 132 * In these cases, the new evaluator may be abandoned. 133 */ 134 public final class ActiveContextHandler implements EventHandler<ActiveContext> { 135 @Override 136 public void onNext(final ActiveContext context) { 137 synchronized (SchedulerDriver.this) { 138 LOG.log(Level.INFO, "Context available : {0}", context.getId()); 139 140 if (scheduler.hasPendingTasks()) { 141 state = State.RUNNING; 142 scheduler.submitTask(context); 143 } else if (nActiveEval > 1) { 144 nActiveEval--; 145 context.close(); 146 } else { 147 state = State.READY; 148 waitForCommands(context); 149 } 150 } 151 } 152 } 153 154 /** 155 * When a Task completes, the task is marked as finished. 156 * The evaluator is reused for the next Task if retainable is set to {@code true}. 157 * Otherwise the evaluator is released. 158 */ 159 public final class CompletedTaskHandler implements EventHandler<CompletedTask> { 160 @Override 161 public void onNext(final CompletedTask task) { 162 final int taskId = Integer.parseInt(task.getId()); 163 164 synchronized (SchedulerDriver.this) { 165 scheduler.setFinished(taskId); 166 167 LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable)); 168 final ActiveContext context = task.getActiveContext(); 169 170 if (retainable) { 171 retainEvaluator(context); 172 } else { 173 reallocateEvaluator(context); 174 } 175 } 176 } 177 } 178 179 180 /** 181 * Get the list of tasks in the scheduler. 182 */ 183 public synchronized Map<String, List<Integer>> getList() { 184 return scheduler.getList(); 185 } 186 187 /** 188 * Clear all the Tasks from the waiting queue. 189 */ 190 public synchronized int clearList() { 191 return scheduler.clear(); 192 } 193 194 /** 195 * Get the status of a task. 196 */ 197 public synchronized String getTaskStatus(final int taskId) throws NotFoundException { 198 return scheduler.getTaskStatus(taskId); 199 } 200 201 /** 202 * Cancel a Task waiting on the queue. A task cannot be canceled 203 * once it is running. 204 */ 205 public synchronized int cancelTask(final int taskId) throws NotFoundException, UnsuccessfulException { 206 return scheduler.cancelTask(taskId); 207 } 208 209 /** 210 * Submit a command to schedule. 211 */ 212 public synchronized int submitCommand(final String command) { 213 final Integer id = scheduler.assignTaskId(); 214 scheduler.addTask(new TaskEntity(id, command)); 215 216 if (state == State.READY) { 217 notify(); // Wake up at {waitForCommands} 218 } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) { 219 requestEvaluator(1); 220 } 221 return id; 222 } 223 224 /** 225 * Update the maximum number of evaluators to hold. 226 * Request more evaluators in case there are pending tasks 227 * in the queue and the number of evaluators is less than the limit. 228 */ 229 public synchronized int setMaxEvaluators(final int targetNum) throws UnsuccessfulException { 230 if (targetNum < nActiveEval + nRequestedEval) { 231 throw new UnsuccessfulException(nActiveEval + nRequestedEval + 232 " evaluators are used now. Should be larger than that."); 233 } 234 nMaxEval = targetNum; 235 236 if (scheduler.hasPendingTasks()) { 237 final int nToRequest = 238 Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval; 239 requestEvaluator(nToRequest); 240 } 241 return nMaxEval; 242 } 243 244 /** 245 * Request evaluators. Passing a non positive number is illegal, 246 * so it does not make a trial for that situation. 247 */ 248 private synchronized void requestEvaluator(final int numToRequest) { 249 if (numToRequest <= 0) { 250 throw new IllegalArgumentException("The number of evaluator request should be a positive integer"); 251 } 252 253 nRequestedEval += numToRequest; 254 requestor.submit(EvaluatorRequest.newBuilder() 255 .setMemory(32) 256 .setNumber(numToRequest) 257 .build()); 258 } 259 260 /** 261 * Pick up a command from the queue and run it. Wait until 262 * any command coming up if no command exists. 263 */ 264 private synchronized void waitForCommands(final ActiveContext context) { 265 while (!scheduler.hasPendingTasks()) { 266 // Wait until any command enters in the queue 267 try { 268 wait(); 269 } catch (final InterruptedException e) { 270 LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e); 271 } 272 } 273 // When wakes up, run the first command from the queue. 274 state = State.RUNNING; 275 scheduler.submitTask(context); 276 } 277 278 /** 279 * Retain the complete evaluators submitting another task 280 * until there is no need to reuse them. 281 */ 282 private synchronized void retainEvaluator(final ActiveContext context) { 283 if (scheduler.hasPendingTasks()) { 284 scheduler.submitTask(context); 285 } else if (nActiveEval > 1) { 286 nActiveEval--; 287 context.close(); 288 } else { 289 state = State.READY; 290 waitForCommands(context); 291 } 292 } 293 294 /** 295 * Always close the complete evaluators and 296 * allocate a new evaluator if necessary. 297 */ 298 private synchronized void reallocateEvaluator(final ActiveContext context) { 299 nActiveEval--; 300 context.close(); 301 302 if (scheduler.hasPendingTasks()) { 303 requestEvaluator(1); 304 } else if (nActiveEval <= 0) { 305 state = State.WAIT_EVALUATORS; 306 requestEvaluator(1); 307 } 308 } 309}