001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.hellohttp; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ClosedContext; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.context.FailedContext; 025import org.apache.reef.driver.evaluator.AllocatedEvaluator; 026import org.apache.reef.driver.evaluator.EvaluatorRequestor; 027import org.apache.reef.driver.evaluator.FailedEvaluator; 028import org.apache.reef.driver.task.CompletedTask; 029import org.apache.reef.driver.task.TaskConfiguration; 030import org.apache.reef.examples.library.Command; 031import org.apache.reef.examples.library.ShellTask; 032import org.apache.reef.tang.JavaConfigurationBuilder; 033import org.apache.reef.tang.Tang; 034import org.apache.reef.tang.annotations.Unit; 035import org.apache.reef.tang.exceptions.BindException; 036import org.apache.reef.wake.EventHandler; 037import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 038import org.apache.reef.wake.time.event.StartTime; 039import org.apache.reef.wake.time.event.StopTime; 040 041import javax.inject.Inject; 042import java.util.ArrayList; 043import java.util.HashMap; 044import java.util.List; 045import java.util.Map; 046import java.util.logging.Level; 047import java.util.logging.Logger; 048 049/** 050 * The Driver code for the Hello REEF Http Distributed Shell Application. 051 */ 052@SuppressWarnings("checkstyle:hideutilityclassconstructor") 053@Unit 054public final class HttpShellJobDriver { 055 private static final Logger LOG = Logger.getLogger(HttpShellJobDriver.class.getName()); 056 057 /** 058 * String codec is used to encode the results 059 * before passing them back to the client. 060 */ 061 public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 062 /** 063 * Evaluator Requester. 064 */ 065 private final EvaluatorRequestor evaluatorRequestor; 066 /** 067 * Number of Evaluators to request (default is 2). 068 */ 069 private final int numEvaluators = 2; 070 /** 071 * Shell execution results from each Evaluator. 072 */ 073 private final List<String> results = new ArrayList<>(); 074 /** 075 * Map from context ID to running evaluator context. 076 */ 077 private final Map<String, ActiveContext> contexts = new HashMap<>(); 078 /** 079 * Job driver state. 080 */ 081 private State state = State.INIT; 082 /** 083 * First command to execute. Sometimes client can send us the first command 084 * before Evaluators are available; we need to store this command here. 085 */ 086 private String cmd; 087 /** 088 * Number of evaluators/tasks to complete. 089 */ 090 private int expectCount = 0; 091 /** 092 * Callback handler for http return message. 093 */ 094 private HttpServerShellCmdHandler.ClientCallBackHandler httpCallbackHandler; 095 096 /** 097 * Job Driver Constructor. 098 * 099 * @param requestor 100 * @param clientCallBackHandler 101 */ 102 @Inject 103 public HttpShellJobDriver(final EvaluatorRequestor requestor, 104 final HttpServerShellCmdHandler.ClientCallBackHandler clientCallBackHandler) { 105 this.evaluatorRequestor = requestor; 106 this.httpCallbackHandler = clientCallBackHandler; 107 LOG.log(Level.FINE, "Instantiated 'HttpShellJobDriver'"); 108 } 109 110 /** 111 * Construct the final result and forward it to the Client. 112 */ 113 private void returnResults() { 114 final StringBuilder sb = new StringBuilder(); 115 for (final String result : this.results) { 116 sb.append(result); 117 } 118 this.results.clear(); 119 LOG.log(Level.INFO, "Return results to the client:\n{0}", sb); 120 httpCallbackHandler.onNext(CODEC.encode(sb.toString())); 121 } 122 123 /** 124 * Submit command to all available evaluators. 125 * 126 * @param command shell command to execute. 127 */ 128 private synchronized void submit(final String command) { 129 LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}", 130 new Object[]{command, this.contexts.size(), this.state}); 131 assert this.state == State.READY; 132 this.expectCount = this.contexts.size(); 133 this.state = State.WAIT_TASKS; 134 this.cmd = null; 135 for (final ActiveContext context : this.contexts.values()) { 136 this.submit(context, command); 137 } 138 } 139 140 /** 141 * Submit a Task that execute the command to a single Evaluator. 142 * This method is called from <code>submitTask(cmd)</code>. 143 */ 144 private void submit(final ActiveContext context, final String command) { 145 try { 146 LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context}); 147 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 148 cb.addConfiguration( 149 TaskConfiguration.CONF 150 .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task") 151 .set(TaskConfiguration.TASK, ShellTask.class) 152 .build() 153 ); 154 cb.bindNamedParameter(Command.class, command); 155 context.submitTask(cb.build()); 156 } catch (final BindException ex) { 157 LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex); 158 context.close(); 159 throw new RuntimeException(ex); 160 } 161 } 162 163 /** 164 * Request the evaluators. 165 */ 166 private synchronized void requestEvaluators() { 167 assert this.state == State.INIT; 168 LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators); 169 this.evaluatorRequestor.newRequest() 170 .setMemory(128) 171 .setNumberOfCores(1) 172 .setNumber(this.numEvaluators) 173 .submit(); 174 this.state = State.WAIT_EVALUATORS; 175 this.expectCount = this.numEvaluators; 176 } 177 178 /** 179 * Possible states of the job driver. Can be one of: 180 * <dl> 181 * <dt><code>INIT</code></dt><dd>initial state, ready to request the evaluators.</dd> 182 * <dt><code>WAIT_EVALUATORS</code></dt><dd>Wait for requested evaluators to initialize.</dd> 183 * <dt><code>READY</code></dt><dd>Ready to submitTask a new task.</dd> 184 * <dt><code>WAIT_TASKS</code></dt><dd>Wait for tasks to complete.</dd> 185 * </dl> 186 */ 187 private enum State { 188 INIT, WAIT_EVALUATORS, READY, WAIT_TASKS 189 } 190 191 /** 192 * Receive notification that an Evaluator had been allocated, 193 * and submitTask a new Task in that Evaluator. 194 */ 195 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 196 @Override 197 public void onNext(final AllocatedEvaluator eval) { 198 synchronized (HttpShellJobDriver.this) { 199 LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}", 200 new Object[]{eval.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.contexts.size()}); 201 assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS; 202 try { 203 eval.submitContext(ContextConfiguration.CONF.set( 204 ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build()); 205 } catch (final BindException ex) { 206 LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex); 207 throw new RuntimeException(ex); 208 } 209 } 210 } 211 } 212 213 /** 214 * Receive notification that the entire Evaluator had failed. 215 * Stop other jobs and pass this error to the job observer on the client. 216 */ 217 final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 218 @Override 219 public void onNext(final FailedEvaluator eval) { 220 synchronized (HttpShellJobDriver.this) { 221 LOG.log(Level.SEVERE, "FailedEvaluator", eval); 222 for (final FailedContext failedContext : eval.getFailedContextList()) { 223 HttpShellJobDriver.this.contexts.remove(failedContext.getId()); 224 } 225 throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException()); 226 } 227 } 228 } 229 230 /** 231 * Receive notification that a new Context is available. 232 * Submit a new Distributed Shell Task to that Context. 233 */ 234 final class ActiveContextHandler implements EventHandler<ActiveContext> { 235 @Override 236 public void onNext(final ActiveContext context) { 237 synchronized (HttpShellJobDriver.this) { 238 LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}", 239 new Object[]{context.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.state}); 240 assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS; 241 HttpShellJobDriver.this.contexts.put(context.getId(), context); 242 if (--HttpShellJobDriver.this.expectCount <= 0) { 243 HttpShellJobDriver.this.state = State.READY; 244 if (HttpShellJobDriver.this.cmd == null) { 245 LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}", 246 HttpShellJobDriver.this.state); 247 } else { 248 HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd); 249 } 250 } 251 } 252 } 253 } 254 255 /** 256 * Receive notification that the Context had completed. 257 * Remove context from the list of active context. 258 */ 259 final class ClosedContextHandler implements EventHandler<ClosedContext> { 260 @Override 261 public void onNext(final ClosedContext context) { 262 LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); 263 synchronized (HttpShellJobDriver.this) { 264 HttpShellJobDriver.this.contexts.remove(context.getId()); 265 } 266 } 267 } 268 269 final class HttpClientCloseHandler implements EventHandler<Void> { 270 @Override 271 public void onNext(final Void aVoid) throws RuntimeException { 272 LOG.log(Level.INFO, "Received a close message from the client. " + 273 "You can put code here to properly close drivers and evaluators."); 274 for (final ActiveContext c : contexts.values()) { 275 c.close(); 276 } 277 } 278 } 279 280 /** 281 * Receive notification that the Context had failed. 282 * Remove context from the list of active context and notify the client. 283 */ 284 final class FailedContextHandler implements EventHandler<FailedContext> { 285 @Override 286 public void onNext(final FailedContext context) { 287 LOG.log(Level.SEVERE, "FailedContext", context); 288 synchronized (HttpShellJobDriver.this) { 289 HttpShellJobDriver.this.contexts.remove(context.getId()); 290 } 291 throw new RuntimeException("Failed context: ", context.asError()); 292 } 293 } 294 295 /** 296 * Receive notification that the Task has completed successfully. 297 */ 298 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 299 @Override 300 public void onNext(final CompletedTask task) { 301 LOG.log(Level.INFO, "Completed task: {0}", task.getId()); 302 // Take the message returned by the task and add it to the running result. 303 final String result = CODEC.decode(task.get()); 304 synchronized (HttpShellJobDriver.this) { 305 HttpShellJobDriver.this.results.add(task.getId() + " :: " + result); 306 LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{ 307 task.getId(), HttpShellJobDriver.this.results.size(), result, HttpShellJobDriver.this.state}); 308 if (--HttpShellJobDriver.this.expectCount <= 0) { 309 HttpShellJobDriver.this.returnResults(); 310 HttpShellJobDriver.this.state = State.READY; 311 if (HttpShellJobDriver.this.cmd != null) { 312 HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd); 313 } 314 } 315 } 316 } 317 } 318 319 /** 320 * Receive notification from the client. 321 */ 322 final class ClientMessageHandler implements EventHandler<byte[]> { 323 @Override 324 public void onNext(final byte[] message) { 325 synchronized (HttpShellJobDriver.this) { 326 final String command = CODEC.decode(message); 327 LOG.log(Level.INFO, "Client message: {0} state: {1}", 328 new Object[]{command, HttpShellJobDriver.this.state}); 329 assert HttpShellJobDriver.this.cmd == null; 330 if (HttpShellJobDriver.this.state == State.READY) { 331 HttpShellJobDriver.this.submit(command); 332 } else { 333 // not ready yet - save the command for better times. 334 assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS; 335 HttpShellJobDriver.this.cmd = command; 336 } 337 } 338 } 339 } 340 341 /** 342 * Job Driver is ready and the clock is set up: request the evaluators. 343 */ 344 final class StartHandler implements EventHandler<StartTime> { 345 @Override 346 public void onNext(final StartTime startTime) { 347 synchronized (HttpShellJobDriver.this) { 348 LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime}); 349 assert state == State.INIT; 350 requestEvaluators(); 351 } 352 } 353 } 354 355 /** 356 * Shutting down the job driver: close the evaluators. 357 */ 358 final class StopHandler implements EventHandler<StopTime> { 359 @Override 360 public void onNext(final StopTime time) { 361 synchronized (HttpShellJobDriver.this) { 362 LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time}); 363 for (final ActiveContext context : contexts.values()) { 364 context.close(); 365 } 366 } 367 } 368 } 369}