001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.hellohttp; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ClosedContext; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.context.FailedContext; 025import org.apache.reef.driver.evaluator.AllocatedEvaluator; 026import org.apache.reef.driver.evaluator.EvaluatorRequest; 027import org.apache.reef.driver.evaluator.EvaluatorRequestor; 028import org.apache.reef.driver.evaluator.FailedEvaluator; 029import org.apache.reef.driver.task.CompletedTask; 030import org.apache.reef.driver.task.TaskConfiguration; 031import org.apache.reef.examples.library.Command; 032import org.apache.reef.examples.library.ShellTask; 033import org.apache.reef.tang.JavaConfigurationBuilder; 034import org.apache.reef.tang.Tang; 035import org.apache.reef.tang.annotations.Unit; 036import org.apache.reef.tang.exceptions.BindException; 037import org.apache.reef.wake.EventHandler; 038import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 039import org.apache.reef.wake.time.event.StartTime; 040import org.apache.reef.wake.time.event.StopTime; 041 042import javax.inject.Inject; 043import java.util.ArrayList; 044import java.util.HashMap; 045import java.util.List; 046import java.util.Map; 047import java.util.logging.Level; 048import java.util.logging.Logger; 049 050/** 051 * The Driver code for the Hello REEF Http Distributed Shell Application 052 */ 053@Unit 054public final class HttpShellJobDriver { 055 056 /** 057 * String codec is used to encode the results 058 * before passing them back to the client. 059 */ 060 public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 061 private static final Logger LOG = Logger.getLogger(HttpShellJobDriver.class.getName()); 062 /** 063 * Evaluator Requester 064 */ 065 private final EvaluatorRequestor evaluatorRequestor; 066 /** 067 * Number of Evalutors to request (default is 1). 068 */ 069 private final int numEvaluators = 2; 070 /** 071 * Shell execution results from each Evaluator. 072 */ 073 private final List<String> results = new ArrayList<>(); 074 /** 075 * Map from context ID to running evaluator context. 076 */ 077 private final Map<String, ActiveContext> contexts = new HashMap<>(); 078 /** 079 * Job driver state. 080 */ 081 private State state = State.INIT; 082 /** 083 * First command to execute. Sometimes client can send us the first command 084 * before Evaluators are available; we need to store this command here. 085 */ 086 private String cmd; 087 /** 088 * Number of evaluators/tasks to complete. 089 */ 090 private int expectCount = 0; 091 /** 092 * Callback handler for http return message 093 */ 094 private HttpServerShellCmdtHandler.ClientCallBackHandler httpCallbackHandler; 095 096 /** 097 * Job Driver Constructor 098 * 099 * @param requestor 100 * @param clientCallBackHandler 101 */ 102 @Inject 103 public HttpShellJobDriver(final EvaluatorRequestor requestor, final HttpServerShellCmdtHandler.ClientCallBackHandler clientCallBackHandler) { 104 this.evaluatorRequestor = requestor; 105 this.httpCallbackHandler = clientCallBackHandler; 106 LOG.log(Level.FINE, "Instantiated 'HelloDriver'"); 107 } 108 109 /** 110 * Construct the final result and forward it to the Client. 111 */ 112 private void returnResults() { 113 final StringBuilder sb = new StringBuilder(); 114 for (final String result : this.results) { 115 sb.append(result); 116 } 117 this.results.clear(); 118 LOG.log(Level.INFO, "Return results to the client:\n{0}", sb); 119 httpCallbackHandler.onNext(CODEC.encode(sb.toString())); 120 } 121 122 /** 123 * Submit command to all available evaluators. 124 * 125 * @param command shell command to execute. 126 */ 127 private void submit(final String command) { 128 LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}", 129 new Object[]{command, this.contexts.size(), this.state}); 130 assert (this.state == State.READY); 131 this.expectCount = this.contexts.size(); 132 this.state = State.WAIT_TASKS; 133 this.cmd = null; 134 for (final ActiveContext context : this.contexts.values()) { 135 this.submit(context, command); 136 } 137 } 138 139 /** 140 * Submit a Task that execute the command to a single Evaluator. 141 * This method is called from <code>submitTask(cmd)</code>. 142 */ 143 private void submit(final ActiveContext context, final String command) { 144 try { 145 LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context}); 146 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 147 cb.addConfiguration( 148 TaskConfiguration.CONF 149 .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task") 150 .set(TaskConfiguration.TASK, ShellTask.class) 151 .build() 152 ); 153 cb.bindNamedParameter(Command.class, command); 154 context.submitTask(cb.build()); 155 } catch (final BindException ex) { 156 LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex); 157 context.close(); 158 throw new RuntimeException(ex); 159 } 160 } 161 162 /** 163 * Request the evaluators. 164 */ 165 private synchronized void requestEvaluators() { 166 assert (this.state == State.INIT); 167 LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators); 168 this.evaluatorRequestor.submit( 169 EvaluatorRequest.newBuilder() 170 .setMemory(128) 171 .setNumberOfCores(1) 172 .setNumber(this.numEvaluators).build() 173 ); 174 this.state = State.WAIT_EVALUATORS; 175 this.expectCount = this.numEvaluators; 176 } 177 178 /** 179 * Possible states of the job driver. Can be one of: 180 * <dl> 181 * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd> 182 * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd> 183 * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd> 184 * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd> 185 * </dl> 186 */ 187 private enum State { 188 INIT, WAIT_EVALUATORS, READY, WAIT_TASKS 189 } 190 191 /** 192 * Receive notification that an Evaluator had been allocated, 193 * and submitTask a new Task in that Evaluator. 194 */ 195 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 196 @Override 197 public void onNext(final AllocatedEvaluator eval) { 198 synchronized (HttpShellJobDriver.this) { 199 LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}", 200 new Object[]{eval.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.contexts.size()}); 201 assert (HttpShellJobDriver.this.state == State.WAIT_EVALUATORS); 202 try { 203 eval.submitContext(ContextConfiguration.CONF.set( 204 ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build()); 205 } catch (final BindException ex) { 206 LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex); 207 throw new RuntimeException(ex); 208 } 209 } 210 } 211 } 212 213 /** 214 * Receive notification that the entire Evaluator had failed. 215 * Stop other jobs and pass this error to the job observer on the client. 216 */ 217 final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 218 @Override 219 public void onNext(final FailedEvaluator eval) { 220 synchronized (HttpShellJobDriver.this) { 221 LOG.log(Level.SEVERE, "FailedEvaluator", eval); 222 for (final FailedContext failedContext : eval.getFailedContextList()) { 223 HttpShellJobDriver.this.contexts.remove(failedContext.getId()); 224 } 225 throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException()); 226 } 227 } 228 } 229 230 /** 231 * Receive notification that a new Context is available. 232 * Submit a new Distributed Shell Task to that Context. 233 */ 234 final class ActiveContextHandler implements EventHandler<ActiveContext> { 235 @Override 236 public void onNext(final ActiveContext context) { 237 synchronized (HttpShellJobDriver.this) { 238 LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}", 239 new Object[]{context.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.state}); 240 assert (HttpShellJobDriver.this.state == State.WAIT_EVALUATORS); 241 HttpShellJobDriver.this.contexts.put(context.getId(), context); 242 if (--HttpShellJobDriver.this.expectCount <= 0) { 243 HttpShellJobDriver.this.state = State.READY; 244 if (HttpShellJobDriver.this.cmd == null) { 245 LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}", 246 HttpShellJobDriver.this.state); 247 } else { 248 HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd); 249 } 250 } 251 } 252 } 253 } 254 255 /** 256 * Receive notification that the Context had completed. 257 * Remove context from the list of active context. 258 */ 259 final class ClosedContextHandler implements EventHandler<ClosedContext> { 260 @Override 261 public void onNext(final ClosedContext context) { 262 LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); 263 synchronized (HttpShellJobDriver.this) { 264 HttpShellJobDriver.this.contexts.remove(context.getId()); 265 } 266 } 267 } 268 269 final class HttpClientCloseHandler implements EventHandler<Void> { 270 @Override 271 public void onNext(final Void aVoid) throws RuntimeException { 272 LOG.log(Level.INFO, "Received a close message from the client. You can put code here to properly close drivers and evaluators."); 273 for (final ActiveContext c : contexts.values()) { 274 c.close(); 275 } 276 } 277 } 278 279 /** 280 * Receive notification that the Context had failed. 281 * Remove context from the list of active context and notify the client. 282 */ 283 final class FailedContextHandler implements EventHandler<FailedContext> { 284 @Override 285 public void onNext(final FailedContext context) { 286 LOG.log(Level.SEVERE, "FailedContext", context); 287 synchronized (HttpShellJobDriver.this) { 288 HttpShellJobDriver.this.contexts.remove(context.getId()); 289 } 290 throw new RuntimeException("Failed context: ", context.asError()); 291 } 292 } 293 294 /** 295 * Receive notification that the Task has completed successfully. 296 */ 297 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 298 @Override 299 public void onNext(final CompletedTask task) { 300 LOG.log(Level.INFO, "Completed task: {0}", task.getId()); 301 // Take the message returned by the task and add it to the running result. 302 final String result = CODEC.decode(task.get()); 303 synchronized (HttpShellJobDriver.this) { 304 HttpShellJobDriver.this.results.add(task.getId() + " :: " + result); 305 LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{ 306 task.getId(), HttpShellJobDriver.this.results.size(), result, HttpShellJobDriver.this.state}); 307 if (--HttpShellJobDriver.this.expectCount <= 0) { 308 HttpShellJobDriver.this.returnResults(); 309 HttpShellJobDriver.this.state = State.READY; 310 if (HttpShellJobDriver.this.cmd != null) { 311 HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd); 312 } 313 } 314 } 315 } 316 } 317 318 /** 319 * Receive notification from the client. 320 */ 321 final class ClientMessageHandler implements EventHandler<byte[]> { 322 @Override 323 public void onNext(final byte[] message) { 324 synchronized (HttpShellJobDriver.this) { 325 final String command = CODEC.decode(message); 326 LOG.log(Level.INFO, "Client message: {0} state: {1}", 327 new Object[]{command, HttpShellJobDriver.this.state}); 328 assert (HttpShellJobDriver.this.cmd == null); 329 if (HttpShellJobDriver.this.state == State.READY) { 330 HttpShellJobDriver.this.submit(command); 331 } else { 332 // not ready yet - save the command for better times. 333 assert (HttpShellJobDriver.this.state == State.WAIT_EVALUATORS); 334 HttpShellJobDriver.this.cmd = command; 335 } 336 } 337 } 338 } 339 340 /** 341 * Job Driver is ready and the clock is set up: request the evaluators. 342 */ 343 final class StartHandler implements EventHandler<StartTime> { 344 @Override 345 public void onNext(final StartTime startTime) { 346 LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime}); 347 assert (state == State.INIT); 348 requestEvaluators(); 349 } 350 } 351 352 /** 353 * Shutting down the job driver: close the evaluators. 354 */ 355 final class StopHandler implements EventHandler<StopTime> { 356 @Override 357 public void onNext(final StopTime time) { 358 LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time}); 359 for (final ActiveContext context : contexts.values()) { 360 context.close(); 361 } 362 } 363 } 364}