001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.retained_eval; 020 021import org.apache.reef.driver.client.JobMessageObserver; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ClosedContext; 024import org.apache.reef.driver.context.ContextConfiguration; 025import org.apache.reef.driver.context.FailedContext; 026import org.apache.reef.driver.evaluator.AllocatedEvaluator; 027import org.apache.reef.driver.evaluator.EvaluatorRequest; 028import org.apache.reef.driver.evaluator.EvaluatorRequestor; 029import org.apache.reef.driver.evaluator.FailedEvaluator; 030import org.apache.reef.driver.task.CompletedTask; 031import org.apache.reef.driver.task.TaskConfiguration; 032import org.apache.reef.examples.library.Command; 033import org.apache.reef.examples.library.ShellTask; 034import org.apache.reef.tang.JavaConfigurationBuilder; 035import org.apache.reef.tang.Tang; 036import org.apache.reef.tang.annotations.Parameter; 037import org.apache.reef.tang.annotations.Unit; 038import org.apache.reef.tang.exceptions.BindException; 039import org.apache.reef.wake.EventHandler; 040import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 041import org.apache.reef.wake.time.event.StartTime; 042import org.apache.reef.wake.time.event.StopTime; 043 044import javax.inject.Inject; 045import java.util.ArrayList; 046import java.util.HashMap; 047import java.util.List; 048import java.util.Map; 049import java.util.logging.Level; 050import java.util.logging.Logger; 051 052/** 053 * Retained Evaluator example job driver. Execute shell command on all evaluators, 054 * capture stdout, and return concatenated results back to the client. 055 */ 056@Unit 057public final class JobDriver { 058 /** 059 * Standard Java logger. 060 */ 061 private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); 062 063 /** 064 * Duration of one clock interval. 065 */ 066 private static final int CHECK_UP_INTERVAL = 1000; // 1 sec. 067 068 /** 069 * String codec is used to encode the results 070 * before passing them back to the client. 071 */ 072 private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 073 /** 074 * Job observer on the client. 075 * We use it to send results from the driver back to the client. 076 */ 077 private final JobMessageObserver jobMessageObserver; 078 /** 079 * Job driver uses EvaluatorRequestor 080 * to request Evaluators that will run the Tasks. 081 */ 082 private final EvaluatorRequestor evaluatorRequestor; 083 /** 084 * Number of Evalutors to request (default is 1). 085 */ 086 private final int numEvaluators; 087 /** 088 * Shell execution results from each Evaluator. 089 */ 090 private final List<String> results = new ArrayList<>(); 091 /** 092 * Map from context ID to running evaluator context. 093 */ 094 private final Map<String, ActiveContext> contexts = new HashMap<>(); 095 /** 096 * Job driver state. 097 */ 098 private State state = State.INIT; 099 /** 100 * First command to execute. Sometimes client can send us the first command 101 * before Evaluators are available; we need to store this command here. 102 */ 103 private String cmd; 104 /** 105 * Number of evaluators/tasks to complete. 106 */ 107 private int expectCount = 0; 108 109 /** 110 * Job driver constructor. 111 * All parameters are injected from TANG automatically. 112 * 113 * @param jobMessageObserver is used to send messages back to the client. 114 * @param evaluatorRequestor is used to request Evaluators. 115 */ 116 @Inject 117 JobDriver(final JobMessageObserver jobMessageObserver, 118 final EvaluatorRequestor evaluatorRequestor, 119 final @Parameter(Launch.NumEval.class) Integer numEvaluators) { 120 this.jobMessageObserver = jobMessageObserver; 121 this.evaluatorRequestor = evaluatorRequestor; 122 this.numEvaluators = numEvaluators; 123 } 124 125 /** 126 * Construct the final result and forward it to the Client. 127 */ 128 private void returnResults() { 129 final StringBuilder sb = new StringBuilder(); 130 for (final String result : this.results) { 131 sb.append(result); 132 } 133 this.results.clear(); 134 LOG.log(Level.INFO, "Return results to the client:\n{0}", sb); 135 this.jobMessageObserver.sendMessageToClient(CODEC.encode(sb.toString())); 136 } 137 138 /** 139 * Submit command to all available evaluators. 140 * 141 * @param command shell command to execute. 142 */ 143 private void submit(final String command) { 144 LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}", 145 new Object[]{command, this.contexts.size(), this.state}); 146 assert (this.state == State.READY); 147 this.expectCount = this.contexts.size(); 148 this.state = State.WAIT_TASKS; 149 this.cmd = null; 150 for (final ActiveContext context : this.contexts.values()) { 151 this.submit(context, command); 152 } 153 } 154 155 /** 156 * Submit a Task that execute the command to a single Evaluator. 157 * This method is called from <code>submitTask(cmd)</code>. 158 */ 159 private void submit(final ActiveContext context, final String command) { 160 try { 161 LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context}); 162 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 163 cb.addConfiguration( 164 TaskConfiguration.CONF 165 .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task") 166 .set(TaskConfiguration.TASK, ShellTask.class) 167 .build() 168 ); 169 cb.bindNamedParameter(Command.class, command); 170 context.submitTask(cb.build()); 171 } catch (final BindException ex) { 172 LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex); 173 context.close(); 174 throw new RuntimeException(ex); 175 } 176 } 177 178 /** 179 * Request the evaluators. 180 */ 181 private synchronized void requestEvaluators() { 182 assert (this.state == State.INIT); 183 LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators); 184 this.evaluatorRequestor.submit( 185 EvaluatorRequest.newBuilder() 186 .setMemory(128) 187 .setNumberOfCores(1) 188 .setNumber(this.numEvaluators).build() 189 ); 190 this.state = State.WAIT_EVALUATORS; 191 this.expectCount = this.numEvaluators; 192 } 193 194 /** 195 * Possible states of the job driver. Can be one of: 196 * <dl> 197 * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd> 198 * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd> 199 * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd> 200 * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd> 201 * </dl> 202 */ 203 private enum State { 204 INIT, WAIT_EVALUATORS, READY, WAIT_TASKS 205 } 206 207 /** 208 * Receive notification that an Evaluator had been allocated, 209 * and submitTask a new Task in that Evaluator. 210 */ 211 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 212 @Override 213 public void onNext(final AllocatedEvaluator eval) { 214 synchronized (JobDriver.this) { 215 LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}", 216 new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()}); 217 assert (JobDriver.this.state == State.WAIT_EVALUATORS); 218 try { 219 eval.submitContext(ContextConfiguration.CONF.set( 220 ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build()); 221 } catch (final BindException ex) { 222 LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex); 223 throw new RuntimeException(ex); 224 } 225 } 226 } 227 } 228 229 /** 230 * Receive notification that the entire Evaluator had failed. 231 * Stop other jobs and pass this error to the job observer on the client. 232 */ 233 final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 234 @Override 235 public void onNext(final FailedEvaluator eval) { 236 synchronized (JobDriver.this) { 237 LOG.log(Level.SEVERE, "FailedEvaluator", eval); 238 for (final FailedContext failedContext : eval.getFailedContextList()) { 239 JobDriver.this.contexts.remove(failedContext.getId()); 240 } 241 throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException()); 242 } 243 } 244 } 245 246 /** 247 * Receive notification that a new Context is available. 248 * Submit a new Distributed Shell Task to that Context. 249 */ 250 final class ActiveContextHandler implements EventHandler<ActiveContext> { 251 @Override 252 public void onNext(final ActiveContext context) { 253 synchronized (JobDriver.this) { 254 LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}", 255 new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state}); 256 assert (JobDriver.this.state == State.WAIT_EVALUATORS); 257 JobDriver.this.contexts.put(context.getId(), context); 258 if (--JobDriver.this.expectCount <= 0) { 259 JobDriver.this.state = State.READY; 260 if (JobDriver.this.cmd == null) { 261 LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}", 262 JobDriver.this.state); 263 } else { 264 JobDriver.this.submit(JobDriver.this.cmd); 265 } 266 } 267 } 268 } 269 } 270 271 /** 272 * Receive notification that the Context had completed. 273 * Remove context from the list of active context. 274 */ 275 final class ClosedContextHandler implements EventHandler<ClosedContext> { 276 @Override 277 public void onNext(final ClosedContext context) { 278 LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); 279 synchronized (JobDriver.this) { 280 JobDriver.this.contexts.remove(context.getId()); 281 } 282 } 283 } 284 285 /** 286 * Receive notification that the Context had failed. 287 * Remove context from the list of active context and notify the client. 288 */ 289 final class FailedContextHandler implements EventHandler<FailedContext> { 290 @Override 291 public void onNext(final FailedContext context) { 292 LOG.log(Level.SEVERE, "FailedContext", context); 293 synchronized (JobDriver.this) { 294 JobDriver.this.contexts.remove(context.getId()); 295 } 296 throw new RuntimeException("Failed context: ", context.asError()); 297 } 298 } 299 300 /** 301 * Receive notification that the Task has completed successfully. 302 */ 303 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 304 @Override 305 public void onNext(final CompletedTask task) { 306 LOG.log(Level.INFO, "Completed task: {0}", task.getId()); 307 // Take the message returned by the task and add it to the running result. 308 final String result = CODEC.decode(task.get()); 309 synchronized (JobDriver.this) { 310 JobDriver.this.results.add(task.getId() + " :: " + result); 311 LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{ 312 task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state}); 313 if (--JobDriver.this.expectCount <= 0) { 314 JobDriver.this.returnResults(); 315 JobDriver.this.state = State.READY; 316 if (JobDriver.this.cmd != null) { 317 JobDriver.this.submit(JobDriver.this.cmd); 318 } 319 } 320 } 321 } 322 } 323 324 /** 325 * Receive notification from the client. 326 */ 327 final class ClientMessageHandler implements EventHandler<byte[]> { 328 @Override 329 public void onNext(final byte[] message) { 330 synchronized (JobDriver.this) { 331 final String command = CODEC.decode(message); 332 LOG.log(Level.INFO, "Client message: {0} state: {1}", 333 new Object[]{command, JobDriver.this.state}); 334 assert (JobDriver.this.cmd == null); 335 if (JobDriver.this.state == State.READY) { 336 JobDriver.this.submit(command); 337 } else { 338 // not ready yet - save the command for better times. 339 assert (JobDriver.this.state == State.WAIT_EVALUATORS); 340 JobDriver.this.cmd = command; 341 } 342 } 343 } 344 } 345 346 /** 347 * Job Driver is ready and the clock is set up: request the evaluators. 348 */ 349 final class StartHandler implements EventHandler<StartTime> { 350 @Override 351 public void onNext(final StartTime startTime) { 352 LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime}); 353 assert (state == State.INIT); 354 requestEvaluators(); 355 } 356 } 357 358 /** 359 * Shutting down the job driver: close the evaluators. 360 */ 361 final class StopHandler implements EventHandler<StopTime> { 362 @Override 363 public void onNext(final StopTime time) { 364 LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time}); 365 for (final ActiveContext context : contexts.values()) { 366 context.close(); 367 } 368 } 369 } 370}