001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.retained_evalCLR; 020 021import org.apache.reef.client.*; 022import org.apache.reef.tang.Configuration; 023import org.apache.reef.tang.annotations.NamedParameter; 024import org.apache.reef.tang.annotations.Parameter; 025import org.apache.reef.tang.annotations.Unit; 026import org.apache.reef.tang.exceptions.BindException; 027import org.apache.reef.tang.formats.ConfigurationModule; 028import org.apache.reef.util.EnvironmentUtils; 029import org.apache.reef.wake.EventHandler; 030import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 031 032import javax.inject.Inject; 033import java.io.BufferedReader; 034import java.io.File; 035import java.io.IOException; 036import java.io.InputStreamReader; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Retained Evaluator Shell Client. 042 */ 043@Unit 044public class JobClient { 045 046 /** 047 * Standard java logger. 048 */ 049 private static final Logger LOG = Logger.getLogger(JobClient.class.getName()); 050 051 /** 052 * Codec to translate messages to and from the job driver 053 */ 054 private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 055 056 /** 057 * Reference to the REEF framework. 058 * This variable is injected automatically in the constructor. 059 */ 060 private final REEF reef; 061 062 /** 063 * Shell command to submitTask to the job driver. 064 */ 065 private final String command; 066 /** 067 * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode. 068 */ 069 private final boolean isInteractive; 070 /** 071 * Total number of experiments to run. 072 */ 073 private final int maxRuns; 074 /** 075 * Command prompt reader for the interactive mode (stdin). 076 */ 077 private final BufferedReader prompt; 078 /** 079 * Job Driver configuration. 080 */ 081 private Configuration driverConfiguration; 082 private ConfigurationModule driverConfigModule; 083 /** 084 * A reference to the running job that allows client to send messages back to the job driver 085 */ 086 private RunningJob runningJob; 087 088 /** 089 * Start timestamp of the current task. 090 */ 091 private long startTime = 0; 092 093 /** 094 * Total time spent performing tasks in Evaluators. 095 */ 096 private long totalTime = 0; 097 098 /** 099 * Number of experiments ran so far. 100 */ 101 private int numRuns = 0; 102 103 /** 104 * Set to false when job driver is done. 105 */ 106 private boolean isBusy = true; 107 108 /** 109 * Retained Evaluator client. 110 * Parameters are injected automatically by TANG. 111 * 112 * @param command Shell command to run on each Evaluator. 113 * @param reef Reference to the REEF framework. 114 */ 115 @Inject 116 JobClient(final REEF reef, 117 @Parameter(Launch.Command.class) final String command, 118 @Parameter(Launch.NumRuns.class) final Integer numRuns) throws BindException { 119 120 this.reef = reef; 121 this.command = command; 122 this.maxRuns = numRuns; 123 124 // If command is not set, switch to interactive mode. (Yes, we compare pointers here) 125 this.isInteractive = this.command == 126 Launch.Command.class.getAnnotation(NamedParameter.class).default_value(); 127 128 this.prompt = this.isInteractive ? new BufferedReader(new InputStreamReader(System.in)) : null; 129 130 this.driverConfigModule = DriverConfiguration.CONF 131 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class)) 132 .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis()) 133 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class) 134 .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class) 135 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class) 136 .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class) 137 .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class) 138 .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class) 139 .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class) 140 .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class) 141 .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class); 142 } 143 144 private void addCLRFiles(final File folder) throws BindException { 145 ConfigurationModule result = this.driverConfigModule; 146 for (final File f : folder.listFiles()) { 147 if (f.canRead() && f.exists() && f.isFile()) { 148 result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath()); 149 } 150 } 151 152 this.driverConfigModule = result; 153 this.driverConfiguration = this.driverConfigModule.build(); 154 } 155 156 /** 157 * Launch the job driver. 158 * 159 * @throws BindException configuration error. 160 */ 161 public void submit(File clrFolder) { 162 try { 163 addCLRFiles(clrFolder); 164 } catch (final BindException e) { 165 LOG.log(Level.FINE, "Failed to bind", e); 166 } 167 this.reef.submit(this.driverConfiguration); 168 } 169 170 /** 171 * Send command to the job driver. Record timestamp when the command was sent. 172 * If this.command is set, use it; otherwise, ask user for the command. 173 */ 174 private synchronized void submitTask() { 175 if (this.isInteractive) { 176 String cmd; 177 try { 178 do { 179 System.out.print("\nRE> "); 180 cmd = this.prompt.readLine(); 181 } while (cmd != null && cmd.trim().isEmpty()); 182 } catch (final IOException ex) { 183 LOG.log(Level.FINE, "Error reading from stdin: {0}", ex); 184 cmd = null; 185 } 186 if (cmd == null || cmd.equals("exit")) { 187 this.runningJob.close(); 188 stopAndNotify(); 189 } else { 190 this.submitTask(cmd); 191 } 192 } else { 193 // non-interactive batch mode: 194 this.submitTask(this.command); 195 } 196 } 197 198 /** 199 * Send command to the job driver. Record timestamp when the command was sent. 200 * 201 * @param cmd shell command to execute in all Evaluators. 202 */ 203 private synchronized void submitTask(final String cmd) { 204 LOG.log(Level.INFO, "Submit task {0} \"{1}\" to {2}", 205 new Object[]{this.numRuns + 1, cmd, this.runningJob}); 206 this.startTime = System.currentTimeMillis(); 207 this.runningJob.send(CODEC.encode(cmd)); 208 } 209 210 /** 211 * Notify the process in waitForCompletion() method that the main process has finished. 212 */ 213 private synchronized void stopAndNotify() { 214 this.runningJob = null; 215 this.isBusy = false; 216 this.notify(); 217 } 218 219 /** 220 * Wait for the job driver to complete. This method is called from Launcher.main() 221 */ 222 public void waitForCompletion() { 223 while (this.isBusy) { 224 LOG.info("Waiting for the Job Driver to complete."); 225 try { 226 synchronized (this) { 227 this.wait(); 228 } 229 } catch (final InterruptedException ex) { 230 LOG.log(Level.WARNING, "Waiting for result interrupted.", ex); 231 } 232 } 233 this.reef.close(); 234 } 235 236 /** 237 * Receive notification from the job driver that the job is running. 238 */ 239 final class RunningJobHandler implements EventHandler<RunningJob> { 240 @Override 241 public void onNext(final RunningJob job) { 242 LOG.log(Level.INFO, "Running job: {0}", job.getId()); 243 synchronized (JobClient.this) { 244 JobClient.this.runningJob = job; 245 JobClient.this.submitTask(); 246 } 247 } 248 } 249 250 /** 251 * Receive message from the job driver. 252 * There is only one message, which comes at the end of the driver execution 253 * and contains shell command output on each node. 254 */ 255 final class JobMessageHandler implements EventHandler<JobMessage> { 256 @Override 257 public void onNext(final JobMessage message) { 258 synchronized (JobClient.this) { 259 260 final String result = CODEC.decode(message.get()); 261 final long jobTime = System.currentTimeMillis() - startTime; 262 totalTime += jobTime; 263 ++numRuns; 264 265 LOG.log(Level.INFO, "Task {0} completed in {1} msec.:\n{2}", 266 new Object[]{numRuns, jobTime, result}); 267 268 System.out.println(result); 269 270 if (runningJob != null) { 271 if (isInteractive || numRuns < maxRuns) { 272 submitTask(); 273 } else { 274 LOG.log(Level.INFO, 275 "All {0} tasks complete; Average task time: {1}. Closing the job driver.", 276 new Object[]{maxRuns, totalTime / (double) maxRuns}); 277 runningJob.close(); 278 stopAndNotify(); 279 } 280 } 281 } 282 } 283 } 284 285 /** 286 * Receive notification from the job driver that the job had failed. 287 */ 288 final class FailedJobHandler implements EventHandler<FailedJob> { 289 @Override 290 public void onNext(final FailedJob job) { 291 LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null)); 292 stopAndNotify(); 293 } 294 } 295 296 /** 297 * Receive notification from the job driver that the job had completed successfully. 298 */ 299 final class CompletedJobHandler implements EventHandler<CompletedJob> { 300 @Override 301 public void onNext(final CompletedJob job) { 302 LOG.log(Level.INFO, "Completed job: {0}", job.getId()); 303 stopAndNotify(); 304 } 305 } 306 307 /** 308 * Receive notification that there was an exception thrown from the job driver. 309 */ 310 final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { 311 @Override 312 public void onNext(final FailedRuntime error) { 313 LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null)); 314 stopAndNotify(); 315 } 316 } 317}