001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.pool; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.CompletedEvaluator; 025import org.apache.reef.driver.evaluator.EvaluatorRequest; 026import org.apache.reef.driver.evaluator.EvaluatorRequestor; 027import org.apache.reef.driver.task.CompletedTask; 028import org.apache.reef.driver.task.RunningTask; 029import org.apache.reef.driver.task.TaskConfiguration; 030import org.apache.reef.tang.Configuration; 031import org.apache.reef.tang.JavaConfigurationBuilder; 032import org.apache.reef.tang.Tang; 033import org.apache.reef.tang.annotations.Parameter; 034import org.apache.reef.tang.annotations.Unit; 035import org.apache.reef.tang.exceptions.BindException; 036import org.apache.reef.wake.EventHandler; 037import org.apache.reef.wake.time.event.StartTime; 038import org.apache.reef.wake.time.event.StopTime; 039 040import javax.inject.Inject; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Allocate N evaluators, submit M tasks to them, and measure the time. 046 * Each task does nothing but sleeps for D seconds. 047 */ 048@Unit 049public final class JobDriver { 050 051 /** 052 * Standard Java logger. 053 */ 054 private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); 055 056 /** 057 * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks. 058 */ 059 private final EvaluatorRequestor evaluatorRequestor; 060 061 /** 062 * If true, submit context and task in one request. 063 */ 064 private final boolean isPiggyback; 065 066 /** 067 * Number of Evaluators to request. 068 */ 069 private final int numEvaluators; 070 071 /** 072 * Number of Tasks to run. 073 */ 074 private final int numTasks; 075 /** 076 * Number of seconds to sleep in each Task. 077 * (has to be a String to pass it into Task config). 078 */ 079 private final String delayStr; 080 /** 081 * Number of Evaluators started. 082 */ 083 private int numEvaluatorsStarted = 0; 084 /** 085 * Number of Tasks launched. 086 */ 087 private int numTasksStarted = 0; 088 089 /** 090 * Job driver constructor. 091 * All parameters are injected from TANG automatically. 092 * 093 * @param evaluatorRequestor is used to request Evaluators. 094 */ 095 @Inject 096 JobDriver(final EvaluatorRequestor evaluatorRequestor, 097 @Parameter(Launch.Piggyback.class) final Boolean isPiggyback, 098 @Parameter(Launch.NumEvaluators.class) final Integer numEvaluators, 099 @Parameter(Launch.NumTasks.class) final Integer numTasks, 100 @Parameter(Launch.Delay.class) final Integer delay) { 101 this.evaluatorRequestor = evaluatorRequestor; 102 this.isPiggyback = isPiggyback; 103 this.numEvaluators = numEvaluators; 104 this.numTasks = numTasks; 105 this.delayStr = "" + delay; 106 } 107 108 /** 109 * Build a new Task configuration for a given task ID. 110 * 111 * @param taskId Unique string ID of the task 112 * @return Immutable task configuration object, ready to be submitted to REEF. 113 * @throws RuntimeException that wraps BindException if unable to build the configuration. 114 */ 115 private Configuration getTaskConfiguration(final String taskId) { 116 try { 117 return TaskConfiguration.CONF 118 .set(TaskConfiguration.IDENTIFIER, taskId) 119 .set(TaskConfiguration.TASK, SleepTask.class) 120 .build(); 121 } catch (final BindException ex) { 122 LOG.log(Level.SEVERE, "Failed to create Task Configuration: " + taskId, ex); 123 throw new RuntimeException(ex); 124 } 125 } 126 127 /** 128 * Job Driver is ready and the clock is set up: request the evaluators. 129 */ 130 final class StartHandler implements EventHandler<StartTime> { 131 @Override 132 public void onNext(final StartTime startTime) { 133 LOG.log(Level.INFO, "TIME: Start Driver with {0} Evaluators", numEvaluators); 134 evaluatorRequestor.submit( 135 EvaluatorRequest.newBuilder() 136 .setMemory(128) 137 .setNumberOfCores(1) 138 .setNumber(numEvaluators).build() 139 ); 140 } 141 } 142 143 /** 144 * Job Driver is is shutting down: write to the log. 145 */ 146 final class StopHandler implements EventHandler<StopTime> { 147 @Override 148 public void onNext(final StopTime stopTime) { 149 LOG.log(Level.INFO, "TIME: Stop Driver"); 150 } 151 } 152 153 /** 154 * Receive notification that an Evaluator had been allocated, 155 * and submitTask a new Task in that Evaluator. 156 */ 157 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 158 @Override 159 public void onNext(final AllocatedEvaluator eval) { 160 161 LOG.log(Level.INFO, "TIME: Allocated Evaluator {0}", eval.getId()); 162 163 final boolean runTask; 164 final int nEval; 165 final int nTask; 166 167 synchronized (JobDriver.this) { 168 runTask = numTasksStarted < numTasks; 169 if (runTask) { 170 ++numEvaluatorsStarted; 171 if (isPiggyback) { 172 ++numTasksStarted; 173 } 174 } 175 nEval = numEvaluatorsStarted; 176 nTask = numTasksStarted; 177 } 178 179 if (runTask) { 180 181 final String contextId = String.format("Context_%06d", nEval); 182 LOG.log(Level.INFO, "TIME: Submit Context {0} to Evaluator {1}", 183 new Object[]{contextId, eval.getId()}); 184 185 try { 186 187 final JavaConfigurationBuilder contextConfigBuilder = 188 Tang.Factory.getTang().newConfigurationBuilder(); 189 190 contextConfigBuilder.addConfiguration(ContextConfiguration.CONF 191 .set(ContextConfiguration.IDENTIFIER, contextId) 192 .build()); 193 194 contextConfigBuilder.bindNamedParameter(Launch.Delay.class, delayStr); 195 196 if (isPiggyback) { 197 198 final String taskId = String.format("StartTask_%08d", nTask); 199 final Configuration taskConfig = getTaskConfiguration(taskId); 200 201 LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}", 202 new Object[]{taskId, eval.getId()}); 203 204 eval.submitContextAndTask(contextConfigBuilder.build(), taskConfig); 205 206 } else { 207 eval.submitContext(contextConfigBuilder.build()); 208 } 209 210 } catch (final BindException ex) { 211 LOG.log(Level.SEVERE, "Failed to submit Context to Evaluator: " + eval.getId(), ex); 212 throw new RuntimeException(ex); 213 } 214 } else { 215 LOG.log(Level.INFO, "TIME: Close Evaluator {0}", eval.getId()); 216 eval.close(); 217 } 218 } 219 } 220 221 /** 222 * Receive notification that the Context is active. 223 */ 224 final class ActiveContextHandler implements EventHandler<ActiveContext> { 225 @Override 226 public void onNext(final ActiveContext context) { 227 228 LOG.log(Level.INFO, "TIME: Active Context {0}", context.getId()); 229 230 if (isPiggyback) { 231 return; // Task already submitted 232 } 233 234 final boolean runTask; 235 final int nTask; 236 237 synchronized (JobDriver.this) { 238 runTask = numTasksStarted < numTasks; 239 if (runTask) { 240 ++numTasksStarted; 241 } 242 nTask = numTasksStarted; 243 } 244 245 if (runTask) { 246 final String taskId = String.format("StartTask_%08d", nTask); 247 LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}", 248 new Object[]{taskId, context.getEvaluatorId()}); 249 context.submitTask(getTaskConfiguration(taskId)); 250 } else { 251 context.close(); 252 } 253 } 254 } 255 256 /** 257 * Receive notification that the Task is running. 258 */ 259 final class RunningTaskHandler implements EventHandler<RunningTask> { 260 @Override 261 public void onNext(final RunningTask task) { 262 LOG.log(Level.INFO, "TIME: Running Task {0}", task.getId()); 263 } 264 } 265 266 /** 267 * Receive notification that the Task has completed successfully. 268 */ 269 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 270 @Override 271 public void onNext(final CompletedTask task) { 272 273 final ActiveContext context = task.getActiveContext(); 274 LOG.log(Level.INFO, "TIME: Completed Task {0} on Evaluator {1}", 275 new Object[]{task.getId(), context.getEvaluatorId()}); 276 277 final boolean runTask; 278 final int nTask; 279 synchronized (JobDriver.this) { 280 runTask = numTasksStarted < numTasks; 281 if (runTask) { 282 ++numTasksStarted; 283 } 284 nTask = numTasksStarted; 285 } 286 287 if (runTask) { 288 final String taskId = String.format("Task_%08d", nTask); 289 LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}", 290 new Object[]{taskId, context.getEvaluatorId()}); 291 context.submitTask(getTaskConfiguration(taskId)); 292 } else { 293 LOG.log(Level.INFO, "TIME: Close Evaluator {0}", context.getEvaluatorId()); 294 context.close(); 295 } 296 } 297 } 298 299 /** 300 * Receive notification that the Evaluator has been shut down. 301 */ 302 final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { 303 @Override 304 public void onNext(final CompletedEvaluator eval) { 305 LOG.log(Level.INFO, "TIME: Completed Evaluator {0}", eval.getId()); 306 } 307 } 308}