001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.hellohttp; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ClosedContext; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.context.FailedContext; 025import org.apache.reef.driver.evaluator.AllocatedEvaluator; 026import org.apache.reef.driver.evaluator.EvaluatorRequest; 027import org.apache.reef.driver.evaluator.EvaluatorRequestor; 028import org.apache.reef.driver.evaluator.FailedEvaluator; 029import org.apache.reef.driver.task.CompletedTask; 030import org.apache.reef.driver.task.TaskConfiguration; 031import org.apache.reef.examples.library.Command; 032import org.apache.reef.examples.library.ShellTask; 033import org.apache.reef.tang.JavaConfigurationBuilder; 034import org.apache.reef.tang.Tang; 035import org.apache.reef.tang.annotations.Unit; 036import org.apache.reef.tang.exceptions.BindException; 037import org.apache.reef.wake.EventHandler; 038import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 039import org.apache.reef.wake.time.event.StartTime; 040import org.apache.reef.wake.time.event.StopTime; 041 042import javax.inject.Inject; 043import java.util.ArrayList; 044import java.util.HashMap; 045import java.util.List; 046import java.util.Map; 047import java.util.logging.Level; 048import java.util.logging.Logger; 049 050/** 051 * The Driver code for the Hello REEF Http Distributed Shell Application. 052 */ 053@SuppressWarnings("checkstyle:hideutilityclassconstructor") 054@Unit 055public final class HttpShellJobDriver { 056 private static final Logger LOG = Logger.getLogger(HttpShellJobDriver.class.getName()); 057 058 /** 059 * String codec is used to encode the results 060 * before passing them back to the client. 061 */ 062 public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 063 /** 064 * Evaluator Requester. 065 */ 066 private final EvaluatorRequestor evaluatorRequestor; 067 /** 068 * Number of Evaluators to request (default is 2). 069 */ 070 private final int numEvaluators = 2; 071 /** 072 * Shell execution results from each Evaluator. 073 */ 074 private final List<String> results = new ArrayList<>(); 075 /** 076 * Map from context ID to running evaluator context. 077 */ 078 private final Map<String, ActiveContext> contexts = new HashMap<>(); 079 /** 080 * Job driver state. 081 */ 082 private State state = State.INIT; 083 /** 084 * First command to execute. Sometimes client can send us the first command 085 * before Evaluators are available; we need to store this command here. 086 */ 087 private String cmd; 088 /** 089 * Number of evaluators/tasks to complete. 090 */ 091 private int expectCount = 0; 092 /** 093 * Callback handler for http return message. 094 */ 095 private HttpServerShellCmdHandler.ClientCallBackHandler httpCallbackHandler; 096 097 /** 098 * Job Driver Constructor. 099 * 100 * @param requestor 101 * @param clientCallBackHandler 102 */ 103 @Inject 104 public HttpShellJobDriver(final EvaluatorRequestor requestor, 105 final HttpServerShellCmdHandler.ClientCallBackHandler clientCallBackHandler) { 106 this.evaluatorRequestor = requestor; 107 this.httpCallbackHandler = clientCallBackHandler; 108 LOG.log(Level.FINE, "Instantiated 'HttpShellJobDriver'"); 109 } 110 111 /** 112 * Construct the final result and forward it to the Client. 113 */ 114 private void returnResults() { 115 final StringBuilder sb = new StringBuilder(); 116 for (final String result : this.results) { 117 sb.append(result); 118 } 119 this.results.clear(); 120 LOG.log(Level.INFO, "Return results to the client:\n{0}", sb); 121 httpCallbackHandler.onNext(CODEC.encode(sb.toString())); 122 } 123 124 /** 125 * Submit command to all available evaluators. 126 * 127 * @param command shell command to execute. 128 */ 129 private synchronized void submit(final String command) { 130 LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}", 131 new Object[]{command, this.contexts.size(), this.state}); 132 assert this.state == State.READY; 133 this.expectCount = this.contexts.size(); 134 this.state = State.WAIT_TASKS; 135 this.cmd = null; 136 for (final ActiveContext context : this.contexts.values()) { 137 this.submit(context, command); 138 } 139 } 140 141 /** 142 * Submit a Task that execute the command to a single Evaluator. 143 * This method is called from <code>submitTask(cmd)</code>. 144 */ 145 private void submit(final ActiveContext context, final String command) { 146 try { 147 LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context}); 148 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 149 cb.addConfiguration( 150 TaskConfiguration.CONF 151 .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task") 152 .set(TaskConfiguration.TASK, ShellTask.class) 153 .build() 154 ); 155 cb.bindNamedParameter(Command.class, command); 156 context.submitTask(cb.build()); 157 } catch (final BindException ex) { 158 LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex); 159 context.close(); 160 throw new RuntimeException(ex); 161 } 162 } 163 164 /** 165 * Request the evaluators. 166 */ 167 private synchronized void requestEvaluators() { 168 assert this.state == State.INIT; 169 LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators); 170 this.evaluatorRequestor.submit( 171 EvaluatorRequest.newBuilder() 172 .setMemory(128) 173 .setNumberOfCores(1) 174 .setNumber(this.numEvaluators).build() 175 ); 176 this.state = State.WAIT_EVALUATORS; 177 this.expectCount = this.numEvaluators; 178 } 179 180 /** 181 * Possible states of the job driver. Can be one of: 182 * <dl> 183 * <dt><code>INIT</code></dt><dd>initial state, ready to request the evaluators.</dd> 184 * <dt><code>WAIT_EVALUATORS</code></dt><dd>Wait for requested evaluators to initialize.</dd> 185 * <dt><code>READY</code></dt><dd>Ready to submitTask a new task.</dd> 186 * <dt><code>WAIT_TASKS</code></dt><dd>Wait for tasks to complete.</dd> 187 * </dl> 188 */ 189 private enum State { 190 INIT, WAIT_EVALUATORS, READY, WAIT_TASKS 191 } 192 193 /** 194 * Receive notification that an Evaluator had been allocated, 195 * and submitTask a new Task in that Evaluator. 196 */ 197 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 198 @Override 199 public void onNext(final AllocatedEvaluator eval) { 200 synchronized (HttpShellJobDriver.this) { 201 LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}", 202 new Object[]{eval.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.contexts.size()}); 203 assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS; 204 try { 205 eval.submitContext(ContextConfiguration.CONF.set( 206 ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build()); 207 } catch (final BindException ex) { 208 LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex); 209 throw new RuntimeException(ex); 210 } 211 } 212 } 213 } 214 215 /** 216 * Receive notification that the entire Evaluator had failed. 217 * Stop other jobs and pass this error to the job observer on the client. 218 */ 219 final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 220 @Override 221 public void onNext(final FailedEvaluator eval) { 222 synchronized (HttpShellJobDriver.this) { 223 LOG.log(Level.SEVERE, "FailedEvaluator", eval); 224 for (final FailedContext failedContext : eval.getFailedContextList()) { 225 HttpShellJobDriver.this.contexts.remove(failedContext.getId()); 226 } 227 throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException()); 228 } 229 } 230 } 231 232 /** 233 * Receive notification that a new Context is available. 234 * Submit a new Distributed Shell Task to that Context. 235 */ 236 final class ActiveContextHandler implements EventHandler<ActiveContext> { 237 @Override 238 public void onNext(final ActiveContext context) { 239 synchronized (HttpShellJobDriver.this) { 240 LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}", 241 new Object[]{context.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.state}); 242 assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS; 243 HttpShellJobDriver.this.contexts.put(context.getId(), context); 244 if (--HttpShellJobDriver.this.expectCount <= 0) { 245 HttpShellJobDriver.this.state = State.READY; 246 if (HttpShellJobDriver.this.cmd == null) { 247 LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}", 248 HttpShellJobDriver.this.state); 249 } else { 250 HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd); 251 } 252 } 253 } 254 } 255 } 256 257 /** 258 * Receive notification that the Context had completed. 259 * Remove context from the list of active context. 260 */ 261 final class ClosedContextHandler implements EventHandler<ClosedContext> { 262 @Override 263 public void onNext(final ClosedContext context) { 264 LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); 265 synchronized (HttpShellJobDriver.this) { 266 HttpShellJobDriver.this.contexts.remove(context.getId()); 267 } 268 } 269 } 270 271 final class HttpClientCloseHandler implements EventHandler<Void> { 272 @Override 273 public void onNext(final Void aVoid) throws RuntimeException { 274 LOG.log(Level.INFO, "Received a close message from the client. " + 275 "You can put code here to properly close drivers and evaluators."); 276 for (final ActiveContext c : contexts.values()) { 277 c.close(); 278 } 279 } 280 } 281 282 /** 283 * Receive notification that the Context had failed. 284 * Remove context from the list of active context and notify the client. 285 */ 286 final class FailedContextHandler implements EventHandler<FailedContext> { 287 @Override 288 public void onNext(final FailedContext context) { 289 LOG.log(Level.SEVERE, "FailedContext", context); 290 synchronized (HttpShellJobDriver.this) { 291 HttpShellJobDriver.this.contexts.remove(context.getId()); 292 } 293 throw new RuntimeException("Failed context: ", context.asError()); 294 } 295 } 296 297 /** 298 * Receive notification that the Task has completed successfully. 299 */ 300 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 301 @Override 302 public void onNext(final CompletedTask task) { 303 LOG.log(Level.INFO, "Completed task: {0}", task.getId()); 304 // Take the message returned by the task and add it to the running result. 305 final String result = CODEC.decode(task.get()); 306 synchronized (HttpShellJobDriver.this) { 307 HttpShellJobDriver.this.results.add(task.getId() + " :: " + result); 308 LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{ 309 task.getId(), HttpShellJobDriver.this.results.size(), result, HttpShellJobDriver.this.state}); 310 if (--HttpShellJobDriver.this.expectCount <= 0) { 311 HttpShellJobDriver.this.returnResults(); 312 HttpShellJobDriver.this.state = State.READY; 313 if (HttpShellJobDriver.this.cmd != null) { 314 HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd); 315 } 316 } 317 } 318 } 319 } 320 321 /** 322 * Receive notification from the client. 323 */ 324 final class ClientMessageHandler implements EventHandler<byte[]> { 325 @Override 326 public void onNext(final byte[] message) { 327 synchronized (HttpShellJobDriver.this) { 328 final String command = CODEC.decode(message); 329 LOG.log(Level.INFO, "Client message: {0} state: {1}", 330 new Object[]{command, HttpShellJobDriver.this.state}); 331 assert HttpShellJobDriver.this.cmd == null; 332 if (HttpShellJobDriver.this.state == State.READY) { 333 HttpShellJobDriver.this.submit(command); 334 } else { 335 // not ready yet - save the command for better times. 336 assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS; 337 HttpShellJobDriver.this.cmd = command; 338 } 339 } 340 } 341 } 342 343 /** 344 * Job Driver is ready and the clock is set up: request the evaluators. 345 */ 346 final class StartHandler implements EventHandler<StartTime> { 347 @Override 348 public void onNext(final StartTime startTime) { 349 synchronized (HttpShellJobDriver.this) { 350 LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime}); 351 assert state == State.INIT; 352 requestEvaluators(); 353 } 354 } 355 } 356 357 /** 358 * Shutting down the job driver: close the evaluators. 359 */ 360 final class StopHandler implements EventHandler<StopTime> { 361 @Override 362 public void onNext(final StopTime time) { 363 synchronized (HttpShellJobDriver.this) { 364 LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time}); 365 for (final ActiveContext context : contexts.values()) { 366 context.close(); 367 } 368 } 369 } 370 } 371}