This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.retained_eval;
020
021import org.apache.reef.client.*;
022import org.apache.reef.examples.library.Command;
023import org.apache.reef.tang.Configuration;
024import org.apache.reef.tang.JavaConfigurationBuilder;
025import org.apache.reef.tang.Tang;
026import org.apache.reef.tang.annotations.NamedParameter;
027import org.apache.reef.tang.annotations.Parameter;
028import org.apache.reef.tang.annotations.Unit;
029import org.apache.reef.tang.exceptions.BindException;
030import org.apache.reef.util.EnvironmentUtils;
031import org.apache.reef.wake.EventHandler;
032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
033
034import javax.inject.Inject;
035import java.io.BufferedReader;
036import java.io.IOException;
037import java.io.InputStreamReader;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * Retained Evaluator Shell Client.
043 */
044@Unit
045public class JobClient {
046
047  /**
048   * Standard java logger.
049   */
050  private static final Logger LOG = Logger.getLogger(JobClient.class.getName());
051
052  /**
053   * Codec to translate messages to and from the job driver
054   */
055  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
056
057  /**
058   * Reference to the REEF framework.
059   * This variable is injected automatically in the constructor.
060   */
061  private final REEF reef;
062
063  /**
064   * Shell command to submitTask to the job driver.
065   */
066  private final String command;
067
068  /**
069   * Job Driver configuration.
070   */
071  private final Configuration driverConfiguration;
072
073  /**
074   * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode.
075   */
076  private final boolean isInteractive;
077
078  /**
079   * Total number of experiments to run.
080   */
081  private final int maxRuns;
082
083  /**
084   * Command prompt reader for the interactive mode (stdin).
085   */
086  private final BufferedReader prompt;
087
088  /**
089   * A reference to the running job that allows client to send messages back to the job driver
090   */
091  private RunningJob runningJob;
092
093  /**
094   * Start timestamp of the current task.
095   */
096  private long startTime = 0;
097
098  /**
099   * Total time spent performing tasks in Evaluators.
100   */
101  private long totalTime = 0;
102
103  /**
104   * Number of experiments ran so far.
105   */
106  private int numRuns = 0;
107
108  /**
109   * Set to false when job driver is done.
110   */
111  private boolean isBusy = true;
112
113  /**
114   * Last result returned from the job driver.
115   */
116  private String lastResult;
117
118  /**
119   * Retained Evaluator client.
120   * Parameters are injected automatically by TANG.
121   *
122   * @param command Shell command to run on each Evaluator.
123   * @param reef    Reference to the REEF framework.
124   */
125  @Inject
126  JobClient(final REEF reef,
127            @Parameter(Command.class) final String command,
128            @Parameter(Launch.NumRuns.class) final Integer numRuns,
129            @Parameter(Launch.NumEval.class) final Integer numEvaluators) throws BindException {
130
131    this.reef = reef;
132    this.command = command;
133    this.maxRuns = numRuns;
134
135    // If command is not set, switch to interactive mode. (Yes, we compare pointers here)
136    this.isInteractive = this.command ==
137        Command.class.getAnnotation(NamedParameter.class).default_value();
138
139    this.prompt = this.isInteractive ?
140        new BufferedReader(new InputStreamReader(System.in)) : null;
141
142    final JavaConfigurationBuilder configBuilder = Tang.Factory.getTang().newConfigurationBuilder();
143    configBuilder.addConfiguration(
144        DriverConfiguration.CONF
145            .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class))
146            .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis())
147            .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
148            .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
149            .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
150            .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
151            .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
152            .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
153            .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class)
154            .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
155            .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class)
156            .build()
157    );
158    configBuilder.bindNamedParameter(Launch.NumEval.class, "" + numEvaluators);
159    this.driverConfiguration = configBuilder.build();
160  }
161
162  /**
163   * @return a Configuration binding the ClientConfiguration.* event handlers to this Client.
164   */
165  public static Configuration getClientConfiguration() {
166    return ClientConfiguration.CONF
167        .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class)
168        .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class)
169        .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
170        .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
171        .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
172        .build();
173  }
174
175  /**
176   * Launch the job driver.
177   *
178   * @throws BindException configuration error.
179   */
180  public void submit() {
181    this.reef.submit(this.driverConfiguration);
182  }
183
184  /**
185   * Send command to the job driver. Record timestamp when the command was sent.
186   * If this.command is set, use it; otherwise, ask user for the command.
187   */
188  private synchronized void submitTask() {
189    if (this.isInteractive) {
190      String cmd;
191      try {
192        do {
193          System.out.print("\nRE> ");
194          cmd = this.prompt.readLine();
195        } while (cmd != null && cmd.trim().isEmpty());
196      } catch (final IOException ex) {
197        LOG.log(Level.FINE, "Error reading from stdin: {0}", ex);
198        cmd = null;
199      }
200      if (cmd == null || cmd.equals("exit")) {
201        this.runningJob.close();
202        stopAndNotify();
203      } else {
204        this.submitTask(cmd);
205      }
206    } else {
207      // non-interactive batch mode:
208      this.submitTask(this.command);
209    }
210  }
211
212  /**
213   * Send command to the job driver. Record timestamp when the command was sent.
214   *
215   * @param cmd shell command to execute in all Evaluators.
216   */
217  private synchronized void submitTask(final String cmd) {
218    LOG.log(Level.FINE, "Submit task {0} \"{1}\" to {2}",
219        new Object[]{this.numRuns + 1, cmd, this.runningJob});
220    this.startTime = System.currentTimeMillis();
221    this.runningJob.send(CODEC.encode(cmd));
222  }
223
224  /**
225   * Notify the process in waitForCompletion() method that the main process has finished.
226   */
227  private synchronized void stopAndNotify() {
228    this.runningJob = null;
229    this.isBusy = false;
230    this.notify();
231  }
232
233  /**
234   * Wait for the job driver to complete. This method is called from Launch.main()
235   */
236  public String waitForCompletion() {
237    while (this.isBusy) {
238      LOG.log(Level.FINE, "Waiting for the Job Driver to complete.");
239      try {
240        synchronized (this) {
241          this.wait();
242        }
243      } catch (final InterruptedException ex) {
244        LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
245      }
246    }
247    return this.lastResult;
248  }
249
250  public void close() {
251    this.reef.close();
252  }
253
254  /**
255   * Receive notification from the job driver that the job is running.
256   */
257  final class RunningJobHandler implements EventHandler<RunningJob> {
258    @Override
259    public void onNext(final RunningJob job) {
260      LOG.log(Level.FINE, "Running job: {0}", job.getId());
261      synchronized (JobClient.this) {
262        JobClient.this.runningJob = job;
263        JobClient.this.submitTask();
264      }
265    }
266  }
267
268  /**
269   * Receive message from the job driver.
270   * There is only one message, which comes at the end of the driver execution
271   * and contains shell command output on each node.
272   */
273  final class JobMessageHandler implements EventHandler<JobMessage> {
274    @Override
275    public void onNext(final JobMessage message) {
276      synchronized (JobClient.this) {
277
278        lastResult = CODEC.decode(message.get());
279        final long jobTime = System.currentTimeMillis() - startTime;
280        totalTime += jobTime;
281        ++numRuns;
282
283        LOG.log(Level.FINE, "TIME: Task {0} completed in {1} msec.:\n{2}",
284            new Object[]{"" + numRuns, "" + jobTime, lastResult});
285
286        System.out.println(lastResult);
287
288        if (runningJob != null) {
289          if (isInteractive || numRuns < maxRuns) {
290            submitTask();
291          } else {
292            LOG.log(Level.INFO,
293                "All {0} tasks complete; Average task time: {1}. Closing the job driver.",
294                new Object[]{maxRuns, totalTime / (double) maxRuns});
295            runningJob.close();
296            stopAndNotify();
297          }
298        }
299      }
300    }
301  }
302
303  /**
304   * Receive notification from the job driver that the job had failed.
305   */
306  final class FailedJobHandler implements EventHandler<FailedJob> {
307    @Override
308    public void onNext(final FailedJob job) {
309      LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
310      stopAndNotify();
311    }
312  }
313
314  /**
315   * Receive notification from the job driver that the job had completed successfully.
316   */
317  final class CompletedJobHandler implements EventHandler<CompletedJob> {
318    @Override
319    public void onNext(final CompletedJob job) {
320      LOG.log(Level.FINE, "Completed job: {0}", job.getId());
321      stopAndNotify();
322    }
323  }
324
325  /**
326   * Receive notification that there was an exception thrown from the job driver.
327   */
328  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
329    @Override
330    public void onNext(final FailedRuntime error) {
331      LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null));
332      stopAndNotify();
333    }
334  }
335}