This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.retained_evalCLR;
020
021import org.apache.reef.client.*;
022import org.apache.reef.tang.Configuration;
023import org.apache.reef.tang.annotations.NamedParameter;
024import org.apache.reef.tang.annotations.Parameter;
025import org.apache.reef.tang.annotations.Unit;
026import org.apache.reef.tang.exceptions.BindException;
027import org.apache.reef.tang.formats.ConfigurationModule;
028import org.apache.reef.util.EnvironmentUtils;
029import org.apache.reef.wake.EventHandler;
030import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
031
032import javax.inject.Inject;
033import java.io.BufferedReader;
034import java.io.File;
035import java.io.IOException;
036import java.io.InputStreamReader;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Retained Evaluator Shell Client.
042 */
043@Unit
044public class JobClient {
045
046  /**
047   * Standard java logger.
048   */
049  private static final Logger LOG = Logger.getLogger(JobClient.class.getName());
050
051  /**
052   * Codec to translate messages to and from the job driver
053   */
054  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
055
056  /**
057   * Reference to the REEF framework.
058   * This variable is injected automatically in the constructor.
059   */
060  private final REEF reef;
061
062  /**
063   * Shell command to submitTask to the job driver.
064   */
065  private final String command;
066  /**
067   * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode.
068   */
069  private final boolean isInteractive;
070  /**
071   * Total number of experiments to run.
072   */
073  private final int maxRuns;
074  /**
075   * Command prompt reader for the interactive mode (stdin).
076   */
077  private final BufferedReader prompt;
078  /**
079   * Job Driver configuration.
080   */
081  private Configuration driverConfiguration;
082  private ConfigurationModule driverConfigModule;
083  /**
084   * A reference to the running job that allows client to send messages back to the job driver
085   */
086  private RunningJob runningJob;
087
088  /**
089   * Start timestamp of the current task.
090   */
091  private long startTime = 0;
092
093  /**
094   * Total time spent performing tasks in Evaluators.
095   */
096  private long totalTime = 0;
097
098  /**
099   * Number of experiments ran so far.
100   */
101  private int numRuns = 0;
102
103  /**
104   * Set to false when job driver is done.
105   */
106  private boolean isBusy = true;
107
108  /**
109   * Retained Evaluator client.
110   * Parameters are injected automatically by TANG.
111   *
112   * @param command Shell command to run on each Evaluator.
113   * @param reef    Reference to the REEF framework.
114   */
115  @Inject
116  JobClient(final REEF reef,
117            @Parameter(Launch.Command.class) final String command,
118            @Parameter(Launch.NumRuns.class) final Integer numRuns) throws BindException {
119
120    this.reef = reef;
121    this.command = command;
122    this.maxRuns = numRuns;
123
124    // If command is not set, switch to interactive mode. (Yes, we compare pointers here)
125    this.isInteractive = this.command ==
126        Launch.Command.class.getAnnotation(NamedParameter.class).default_value();
127
128    this.prompt = this.isInteractive ? new BufferedReader(new InputStreamReader(System.in)) : null;
129
130    this.driverConfigModule = DriverConfiguration.CONF
131        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class))
132        .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis())
133        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
134        .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
135        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
136        .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
137        .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
138        .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
139        .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class)
140        .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
141        .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class);
142  }
143
144  private void addCLRFiles(final File folder) throws BindException {
145    ConfigurationModule result = this.driverConfigModule;
146    for (final File f : folder.listFiles()) {
147      if (f.canRead() && f.exists() && f.isFile()) {
148        result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath());
149      }
150    }
151
152    this.driverConfigModule = result;
153    this.driverConfiguration = this.driverConfigModule.build();
154  }
155
156  /**
157   * Launch the job driver.
158   *
159   * @throws BindException configuration error.
160   */
161  public void submit(File clrFolder) {
162    try {
163      addCLRFiles(clrFolder);
164    } catch (final BindException e) {
165      LOG.log(Level.FINE, "Failed to bind", e);
166    }
167    this.reef.submit(this.driverConfiguration);
168  }
169
170  /**
171   * Send command to the job driver. Record timestamp when the command was sent.
172   * If this.command is set, use it; otherwise, ask user for the command.
173   */
174  private synchronized void submitTask() {
175    if (this.isInteractive) {
176      String cmd;
177      try {
178        do {
179          System.out.print("\nRE> ");
180          cmd = this.prompt.readLine();
181        } while (cmd != null && cmd.trim().isEmpty());
182      } catch (final IOException ex) {
183        LOG.log(Level.FINE, "Error reading from stdin: {0}", ex);
184        cmd = null;
185      }
186      if (cmd == null || cmd.equals("exit")) {
187        this.runningJob.close();
188        stopAndNotify();
189      } else {
190        this.submitTask(cmd);
191      }
192    } else {
193      // non-interactive batch mode:
194      this.submitTask(this.command);
195    }
196  }
197
198  /**
199   * Send command to the job driver. Record timestamp when the command was sent.
200   *
201   * @param cmd shell command to execute in all Evaluators.
202   */
203  private synchronized void submitTask(final String cmd) {
204    LOG.log(Level.INFO, "Submit task {0} \"{1}\" to {2}",
205        new Object[]{this.numRuns + 1, cmd, this.runningJob});
206    this.startTime = System.currentTimeMillis();
207    this.runningJob.send(CODEC.encode(cmd));
208  }
209
210  /**
211   * Notify the process in waitForCompletion() method that the main process has finished.
212   */
213  private synchronized void stopAndNotify() {
214    this.runningJob = null;
215    this.isBusy = false;
216    this.notify();
217  }
218
219  /**
220   * Wait for the job driver to complete. This method is called from Launcher.main()
221   */
222  public void waitForCompletion() {
223    while (this.isBusy) {
224      LOG.info("Waiting for the Job Driver to complete.");
225      try {
226        synchronized (this) {
227          this.wait();
228        }
229      } catch (final InterruptedException ex) {
230        LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
231      }
232    }
233    this.reef.close();
234  }
235
236  /**
237   * Receive notification from the job driver that the job is running.
238   */
239  final class RunningJobHandler implements EventHandler<RunningJob> {
240    @Override
241    public void onNext(final RunningJob job) {
242      LOG.log(Level.INFO, "Running job: {0}", job.getId());
243      synchronized (JobClient.this) {
244        JobClient.this.runningJob = job;
245        JobClient.this.submitTask();
246      }
247    }
248  }
249
250  /**
251   * Receive message from the job driver.
252   * There is only one message, which comes at the end of the driver execution
253   * and contains shell command output on each node.
254   */
255  final class JobMessageHandler implements EventHandler<JobMessage> {
256    @Override
257    public void onNext(final JobMessage message) {
258      synchronized (JobClient.this) {
259
260        final String result = CODEC.decode(message.get());
261        final long jobTime = System.currentTimeMillis() - startTime;
262        totalTime += jobTime;
263        ++numRuns;
264
265        LOG.log(Level.INFO, "Task {0} completed in {1} msec.:\n{2}",
266            new Object[]{numRuns, jobTime, result});
267
268        System.out.println(result);
269
270        if (runningJob != null) {
271          if (isInteractive || numRuns < maxRuns) {
272            submitTask();
273          } else {
274            LOG.log(Level.INFO,
275                "All {0} tasks complete; Average task time: {1}. Closing the job driver.",
276                new Object[]{maxRuns, totalTime / (double) maxRuns});
277            runningJob.close();
278            stopAndNotify();
279          }
280        }
281      }
282    }
283  }
284
285  /**
286   * Receive notification from the job driver that the job had failed.
287   */
288  final class FailedJobHandler implements EventHandler<FailedJob> {
289    @Override
290    public void onNext(final FailedJob job) {
291      LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
292      stopAndNotify();
293    }
294  }
295
296  /**
297   * Receive notification from the job driver that the job had completed successfully.
298   */
299  final class CompletedJobHandler implements EventHandler<CompletedJob> {
300    @Override
301    public void onNext(final CompletedJob job) {
302      LOG.log(Level.INFO, "Completed job: {0}", job.getId());
303      stopAndNotify();
304    }
305  }
306
307  /**
308   * Receive notification that there was an exception thrown from the job driver.
309   */
310  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
311    @Override
312    public void onNext(final FailedRuntime error) {
313      LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null));
314      stopAndNotify();
315    }
316  }
317}