001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.pool; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.CompletedEvaluator; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.task.CompletedTask; 027import org.apache.reef.driver.task.RunningTask; 028import org.apache.reef.driver.task.TaskConfiguration; 029import org.apache.reef.tang.Configuration; 030import org.apache.reef.tang.JavaConfigurationBuilder; 031import org.apache.reef.tang.Tang; 032import org.apache.reef.tang.annotations.Parameter; 033import org.apache.reef.tang.annotations.Unit; 034import org.apache.reef.tang.exceptions.BindException; 035import org.apache.reef.wake.EventHandler; 036import org.apache.reef.wake.time.event.StartTime; 037import org.apache.reef.wake.time.event.StopTime; 038 039import javax.inject.Inject; 040import java.util.logging.Level; 041import java.util.logging.Logger; 042 043/** 044 * Allocate N evaluators, submit M tasks to them, and measure the time. 045 * Each task does nothing but sleeps for D seconds. 046 */ 047@Unit 048public final class JobDriver { 049 050 /** 051 * Standard Java logger. 052 */ 053 private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); 054 055 /** 056 * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks. 057 */ 058 private final EvaluatorRequestor evaluatorRequestor; 059 060 /** 061 * If true, submit context and task in one request. 062 */ 063 private final boolean isPiggyback; 064 065 /** 066 * Number of Evaluators to request. 067 */ 068 private final int numEvaluators; 069 070 /** 071 * Number of Tasks to run. 072 */ 073 private final int numTasks; 074 /** 075 * Number of seconds to sleep in each Task. 076 * (has to be a String to pass it into Task config). 077 */ 078 private final String delayStr; 079 /** 080 * Number of Evaluators started. 081 */ 082 private int numEvaluatorsStarted = 0; 083 /** 084 * Number of Tasks launched. 085 */ 086 private int numTasksStarted = 0; 087 088 /** 089 * Job driver constructor. 090 * All parameters are injected from TANG automatically. 091 * 092 * @param evaluatorRequestor is used to request Evaluators. 093 */ 094 @Inject 095 JobDriver(final EvaluatorRequestor evaluatorRequestor, 096 @Parameter(Launch.Piggyback.class) final Boolean isPiggyback, 097 @Parameter(Launch.NumEvaluators.class) final Integer numEvaluators, 098 @Parameter(Launch.NumTasks.class) final Integer numTasks, 099 @Parameter(Launch.Delay.class) final Integer delay) { 100 this.evaluatorRequestor = evaluatorRequestor; 101 this.isPiggyback = isPiggyback; 102 this.numEvaluators = numEvaluators; 103 this.numTasks = numTasks; 104 this.delayStr = "" + delay; 105 } 106 107 /** 108 * Build a new Task configuration for a given task ID. 109 * 110 * @param taskId Unique string ID of the task 111 * @return Immutable task configuration object, ready to be submitted to REEF. 112 * @throws RuntimeException that wraps BindException if unable to build the configuration. 113 */ 114 private Configuration getTaskConfiguration(final String taskId) { 115 try { 116 return TaskConfiguration.CONF 117 .set(TaskConfiguration.IDENTIFIER, taskId) 118 .set(TaskConfiguration.TASK, SleepTask.class) 119 .build(); 120 } catch (final BindException ex) { 121 LOG.log(Level.SEVERE, "Failed to create Task Configuration: " + taskId, ex); 122 throw new RuntimeException(ex); 123 } 124 } 125 126 /** 127 * Job Driver is ready and the clock is set up: request the evaluators. 128 */ 129 final class StartHandler implements EventHandler<StartTime> { 130 @Override 131 public void onNext(final StartTime startTime) { 132 LOG.log(Level.INFO, "TIME: Start Driver with {0} Evaluators", numEvaluators); 133 evaluatorRequestor.newRequest() 134 .setMemory(128) 135 .setNumberOfCores(1) 136 .setNumber(numEvaluators) 137 .submit(); 138 } 139 } 140 141 /** 142 * Job Driver is is shutting down: write to the log. 143 */ 144 final class StopHandler implements EventHandler<StopTime> { 145 @Override 146 public void onNext(final StopTime stopTime) { 147 LOG.log(Level.INFO, "TIME: Stop Driver"); 148 } 149 } 150 151 /** 152 * Receive notification that an Evaluator had been allocated, 153 * and submitTask a new Task in that Evaluator. 154 */ 155 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 156 @Override 157 public void onNext(final AllocatedEvaluator eval) { 158 159 LOG.log(Level.INFO, "TIME: Allocated Evaluator {0}", eval.getId()); 160 161 final boolean runTask; 162 final int nEval; 163 final int nTask; 164 165 synchronized (JobDriver.this) { 166 runTask = numTasksStarted < numTasks; 167 if (runTask) { 168 ++numEvaluatorsStarted; 169 if (isPiggyback) { 170 ++numTasksStarted; 171 } 172 } 173 nEval = numEvaluatorsStarted; 174 nTask = numTasksStarted; 175 } 176 177 if (runTask) { 178 179 final String contextId = String.format("Context_%06d", nEval); 180 LOG.log(Level.INFO, "TIME: Submit Context {0} to Evaluator {1}", 181 new Object[]{contextId, eval.getId()}); 182 183 try { 184 185 final JavaConfigurationBuilder contextConfigBuilder = 186 Tang.Factory.getTang().newConfigurationBuilder(); 187 188 contextConfigBuilder.addConfiguration(ContextConfiguration.CONF 189 .set(ContextConfiguration.IDENTIFIER, contextId) 190 .build()); 191 192 contextConfigBuilder.bindNamedParameter(Launch.Delay.class, delayStr); 193 194 if (isPiggyback) { 195 196 final String taskId = String.format("StartTask_%08d", nTask); 197 final Configuration taskConfig = getTaskConfiguration(taskId); 198 199 LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}", 200 new Object[]{taskId, eval.getId()}); 201 202 eval.submitContextAndTask(contextConfigBuilder.build(), taskConfig); 203 204 } else { 205 eval.submitContext(contextConfigBuilder.build()); 206 } 207 208 } catch (final BindException ex) { 209 LOG.log(Level.SEVERE, "Failed to submit Context to Evaluator: " + eval.getId(), ex); 210 throw new RuntimeException(ex); 211 } 212 } else { 213 LOG.log(Level.INFO, "TIME: Close Evaluator {0}", eval.getId()); 214 eval.close(); 215 } 216 } 217 } 218 219 /** 220 * Receive notification that the Context is active. 221 */ 222 final class ActiveContextHandler implements EventHandler<ActiveContext> { 223 @Override 224 public void onNext(final ActiveContext context) { 225 226 LOG.log(Level.INFO, "TIME: Active Context {0}", context.getId()); 227 228 if (isPiggyback) { 229 return; // Task already submitted 230 } 231 232 final boolean runTask; 233 final int nTask; 234 235 synchronized (JobDriver.this) { 236 runTask = numTasksStarted < numTasks; 237 if (runTask) { 238 ++numTasksStarted; 239 } 240 nTask = numTasksStarted; 241 } 242 243 if (runTask) { 244 final String taskId = String.format("StartTask_%08d", nTask); 245 LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}", 246 new Object[]{taskId, context.getEvaluatorId()}); 247 context.submitTask(getTaskConfiguration(taskId)); 248 } else { 249 context.close(); 250 } 251 } 252 } 253 254 /** 255 * Receive notification that the Task is running. 256 */ 257 final class RunningTaskHandler implements EventHandler<RunningTask> { 258 @Override 259 public void onNext(final RunningTask task) { 260 LOG.log(Level.INFO, "TIME: Running Task {0}", task.getId()); 261 } 262 } 263 264 /** 265 * Receive notification that the Task has completed successfully. 266 */ 267 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 268 @Override 269 public void onNext(final CompletedTask task) { 270 271 final ActiveContext context = task.getActiveContext(); 272 LOG.log(Level.INFO, "TIME: Completed Task {0} on Evaluator {1}", 273 new Object[]{task.getId(), context.getEvaluatorId()}); 274 275 final boolean runTask; 276 final int nTask; 277 synchronized (JobDriver.this) { 278 runTask = numTasksStarted < numTasks; 279 if (runTask) { 280 ++numTasksStarted; 281 } 282 nTask = numTasksStarted; 283 } 284 285 if (runTask) { 286 final String taskId = String.format("Task_%08d", nTask); 287 LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}", 288 new Object[]{taskId, context.getEvaluatorId()}); 289 context.submitTask(getTaskConfiguration(taskId)); 290 } else { 291 LOG.log(Level.INFO, "TIME: Close Evaluator {0}", context.getEvaluatorId()); 292 context.close(); 293 } 294 } 295 } 296 297 /** 298 * Receive notification that the Evaluator has been shut down. 299 */ 300 final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { 301 @Override 302 public void onNext(final CompletedEvaluator eval) { 303 LOG.log(Level.INFO, "TIME: Completed Evaluator {0}", eval.getId()); 304 } 305 } 306}