001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.retained_eval; 020 021import org.apache.reef.client.*; 022import org.apache.reef.examples.library.Command; 023import org.apache.reef.tang.Configuration; 024import org.apache.reef.tang.JavaConfigurationBuilder; 025import org.apache.reef.tang.Tang; 026import org.apache.reef.tang.annotations.NamedParameter; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.tang.annotations.Unit; 029import org.apache.reef.tang.exceptions.BindException; 030import org.apache.reef.util.EnvironmentUtils; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 033 034import javax.inject.Inject; 035import java.io.BufferedReader; 036import java.io.IOException; 037import java.io.InputStreamReader; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Retained Evaluator Shell Client. 043 */ 044@Unit 045public class JobClient { 046 047 /** 048 * Standard java logger. 049 */ 050 private static final Logger LOG = Logger.getLogger(JobClient.class.getName()); 051 052 /** 053 * Codec to translate messages to and from the job driver 054 */ 055 private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 056 057 /** 058 * Reference to the REEF framework. 059 * This variable is injected automatically in the constructor. 060 */ 061 private final REEF reef; 062 063 /** 064 * Shell command to submitTask to the job driver. 065 */ 066 private final String command; 067 068 /** 069 * Job Driver configuration. 070 */ 071 private final Configuration driverConfiguration; 072 073 /** 074 * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode. 075 */ 076 private final boolean isInteractive; 077 078 /** 079 * Total number of experiments to run. 080 */ 081 private final int maxRuns; 082 083 /** 084 * Command prompt reader for the interactive mode (stdin). 085 */ 086 private final BufferedReader prompt; 087 088 /** 089 * A reference to the running job that allows client to send messages back to the job driver 090 */ 091 private RunningJob runningJob; 092 093 /** 094 * Start timestamp of the current task. 095 */ 096 private long startTime = 0; 097 098 /** 099 * Total time spent performing tasks in Evaluators. 100 */ 101 private long totalTime = 0; 102 103 /** 104 * Number of experiments ran so far. 105 */ 106 private int numRuns = 0; 107 108 /** 109 * Set to false when job driver is done. 110 */ 111 private boolean isBusy = true; 112 113 /** 114 * Last result returned from the job driver. 115 */ 116 private String lastResult; 117 118 /** 119 * Retained Evaluator client. 120 * Parameters are injected automatically by TANG. 121 * 122 * @param command Shell command to run on each Evaluator. 123 * @param reef Reference to the REEF framework. 124 */ 125 @Inject 126 JobClient(final REEF reef, 127 @Parameter(Command.class) final String command, 128 @Parameter(Launch.NumRuns.class) final Integer numRuns, 129 @Parameter(Launch.NumEval.class) final Integer numEvaluators) throws BindException { 130 131 this.reef = reef; 132 this.command = command; 133 this.maxRuns = numRuns; 134 135 // If command is not set, switch to interactive mode. (Yes, we compare pointers here) 136 this.isInteractive = this.command == 137 Command.class.getAnnotation(NamedParameter.class).default_value(); 138 139 this.prompt = this.isInteractive ? 140 new BufferedReader(new InputStreamReader(System.in)) : null; 141 142 final JavaConfigurationBuilder configBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 143 configBuilder.addConfiguration( 144 DriverConfiguration.CONF 145 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class)) 146 .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis()) 147 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class) 148 .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class) 149 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class) 150 .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class) 151 .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class) 152 .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class) 153 .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class) 154 .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class) 155 .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class) 156 .build() 157 ); 158 configBuilder.bindNamedParameter(Launch.NumEval.class, "" + numEvaluators); 159 this.driverConfiguration = configBuilder.build(); 160 } 161 162 /** 163 * @return a Configuration binding the ClientConfiguration.* event handlers to this Client. 164 */ 165 public static Configuration getClientConfiguration() { 166 return ClientConfiguration.CONF 167 .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class) 168 .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class) 169 .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class) 170 .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class) 171 .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class) 172 .build(); 173 } 174 175 /** 176 * Launch the job driver. 177 * 178 * @throws BindException configuration error. 179 */ 180 public void submit() { 181 this.reef.submit(this.driverConfiguration); 182 } 183 184 /** 185 * Send command to the job driver. Record timestamp when the command was sent. 186 * If this.command is set, use it; otherwise, ask user for the command. 187 */ 188 private synchronized void submitTask() { 189 if (this.isInteractive) { 190 String cmd; 191 try { 192 do { 193 System.out.print("\nRE> "); 194 cmd = this.prompt.readLine(); 195 } while (cmd != null && cmd.trim().isEmpty()); 196 } catch (final IOException ex) { 197 LOG.log(Level.FINE, "Error reading from stdin: {0}", ex); 198 cmd = null; 199 } 200 if (cmd == null || cmd.equals("exit")) { 201 this.runningJob.close(); 202 stopAndNotify(); 203 } else { 204 this.submitTask(cmd); 205 } 206 } else { 207 // non-interactive batch mode: 208 this.submitTask(this.command); 209 } 210 } 211 212 /** 213 * Send command to the job driver. Record timestamp when the command was sent. 214 * 215 * @param cmd shell command to execute in all Evaluators. 216 */ 217 private synchronized void submitTask(final String cmd) { 218 LOG.log(Level.FINE, "Submit task {0} \"{1}\" to {2}", 219 new Object[]{this.numRuns + 1, cmd, this.runningJob}); 220 this.startTime = System.currentTimeMillis(); 221 this.runningJob.send(CODEC.encode(cmd)); 222 } 223 224 /** 225 * Notify the process in waitForCompletion() method that the main process has finished. 226 */ 227 private synchronized void stopAndNotify() { 228 this.runningJob = null; 229 this.isBusy = false; 230 this.notify(); 231 } 232 233 /** 234 * Wait for the job driver to complete. This method is called from Launch.main() 235 */ 236 public String waitForCompletion() { 237 while (this.isBusy) { 238 LOG.log(Level.FINE, "Waiting for the Job Driver to complete."); 239 try { 240 synchronized (this) { 241 this.wait(); 242 } 243 } catch (final InterruptedException ex) { 244 LOG.log(Level.WARNING, "Waiting for result interrupted.", ex); 245 } 246 } 247 return this.lastResult; 248 } 249 250 public void close() { 251 this.reef.close(); 252 } 253 254 /** 255 * Receive notification from the job driver that the job is running. 256 */ 257 final class RunningJobHandler implements EventHandler<RunningJob> { 258 @Override 259 public void onNext(final RunningJob job) { 260 LOG.log(Level.FINE, "Running job: {0}", job.getId()); 261 synchronized (JobClient.this) { 262 JobClient.this.runningJob = job; 263 JobClient.this.submitTask(); 264 } 265 } 266 } 267 268 /** 269 * Receive message from the job driver. 270 * There is only one message, which comes at the end of the driver execution 271 * and contains shell command output on each node. 272 */ 273 final class JobMessageHandler implements EventHandler<JobMessage> { 274 @Override 275 public void onNext(final JobMessage message) { 276 synchronized (JobClient.this) { 277 278 lastResult = CODEC.decode(message.get()); 279 final long jobTime = System.currentTimeMillis() - startTime; 280 totalTime += jobTime; 281 ++numRuns; 282 283 LOG.log(Level.FINE, "TIME: Task {0} completed in {1} msec.:\n{2}", 284 new Object[]{"" + numRuns, "" + jobTime, lastResult}); 285 286 System.out.println(lastResult); 287 288 if (runningJob != null) { 289 if (isInteractive || numRuns < maxRuns) { 290 submitTask(); 291 } else { 292 LOG.log(Level.INFO, 293 "All {0} tasks complete; Average task time: {1}. Closing the job driver.", 294 new Object[]{maxRuns, totalTime / (double) maxRuns}); 295 runningJob.close(); 296 stopAndNotify(); 297 } 298 } 299 } 300 } 301 } 302 303 /** 304 * Receive notification from the job driver that the job had failed. 305 */ 306 final class FailedJobHandler implements EventHandler<FailedJob> { 307 @Override 308 public void onNext(final FailedJob job) { 309 LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null)); 310 stopAndNotify(); 311 } 312 } 313 314 /** 315 * Receive notification from the job driver that the job had completed successfully. 316 */ 317 final class CompletedJobHandler implements EventHandler<CompletedJob> { 318 @Override 319 public void onNext(final CompletedJob job) { 320 LOG.log(Level.FINE, "Completed job: {0}", job.getId()); 321 stopAndNotify(); 322 } 323 } 324 325 /** 326 * Receive notification that there was an exception thrown from the job driver. 327 */ 328 final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { 329 @Override 330 public void onNext(final FailedRuntime error) { 331 LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null)); 332 stopAndNotify(); 333 } 334 } 335}