001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.client;
020
021import org.apache.reef.annotations.Provided;
022import org.apache.reef.annotations.audience.ClientSide;
023import org.apache.reef.annotations.audience.Public;
024import org.apache.reef.runtime.common.UserCredentials;
025import org.apache.reef.tang.Configuration;
026import org.apache.reef.tang.Tang;
027import org.apache.reef.tang.annotations.Unit;
028import org.apache.reef.tang.exceptions.InjectionException;
029import org.apache.reef.util.Optional;
030import org.apache.reef.wake.EventHandler;
031
032import javax.inject.Inject;
033import java.util.Collections;
034import java.util.HashSet;
035import java.util.logging.Level;
036import java.util.logging.Logger;
037
038/**
039 * A launcher for REEF Drivers.
040 * <p>
041 * It can be instantiated using a configuration that can create a REEF instance.
042 * For example, the local resource manager and the YARN resource manager can do this.
043 * <p>
044 * See {@link org.apache.reef.examples.hello} package for a demo use case.
045 */
046@Public
047@Provided
048@ClientSide
049@Unit
050public final class DriverLauncher implements AutoCloseable {
051
052  private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName());
053
054  private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF
055      .set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class)
056      .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class)
057      .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class)
058      .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class)
059      .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class)
060      .build();
061
062  private final REEF reef;
063  private final UserCredentials user;
064
065  private LauncherStatus status = LauncherStatus.INIT;
066
067  private String jobId;
068  private RunningJob theJob;
069
070  @Inject
071  private DriverLauncher(final REEF reef, final UserCredentials user) {
072    this.reef = reef;
073    this.user = user;
074  }
075
076  public UserCredentials getUser() {
077    return this.user;
078  }
079
080  /**
081   * Instantiate a launcher for the given Configuration.
082   *
083   * @param runtimeConfiguration the resourcemanager configuration to be used
084   * @return a DriverLauncher based on the given resourcemanager configuration
085   * @throws InjectionException on configuration errors
086   */
087  public static DriverLauncher getLauncher(final Configuration runtimeConfiguration) throws InjectionException {
088    return Tang.Factory.getTang()
089        .newInjector(runtimeConfiguration, CLIENT_CONFIG)
090        .getInstance(DriverLauncher.class);
091  }
092
093  /**
094   * Kills the running job.
095   */
096  @Override
097  public void close() {
098    synchronized (this) {
099      LOG.log(Level.FINER, "Close launcher: job {0} with status {1}", new Object[] {this.theJob, this.status});
100      if (this.status.isRunning()) {
101        this.status = LauncherStatus.FORCE_CLOSED;
102      }
103      if (null != this.theJob) {
104        this.theJob.close();
105      }
106      this.notify();
107    }
108    LOG.log(Level.FINEST, "Close launcher: shutdown REEF");
109    this.reef.close();
110    LOG.log(Level.FINEST, "Close launcher: done");
111  }
112
113  /**
114   * Run a job. Waits indefinitely for the job to complete.
115   *
116   * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
117   * @return the state of the job after execution.
118   */
119  public LauncherStatus run(final Configuration driverConfig) {
120    this.reef.submit(driverConfig);
121    synchronized (this) {
122      while (!this.status.isDone()) {
123        try {
124          LOG.log(Level.FINE, "Wait indefinitely");
125          this.wait();
126        } catch (final InterruptedException ex) {
127          LOG.log(Level.FINE, "Interrupted: {0}", ex);
128        }
129      }
130    }
131    this.reef.close();
132    return this.getStatus();
133  }
134
135  /**
136   * Submit REEF job asynchronously and do not wait for its completion.
137   *
138   * @param driverConfig configuration of hte driver to submit to the RM.
139   * @return ID of the new application.
140   */
141  public String submit(final Configuration driverConfig, final long waitTime) {
142    this.reef.submit(driverConfig);
143    this.waitForStatus(waitTime, LauncherStatus.SUBMITTED);
144    return this.jobId;
145  }
146
147  /**
148   * Wait for one of the specified statuses of the REEF job.
149   * This method is called after the job is submitted to the RM via submit().
150   * @param waitTime wait time in milliseconds.
151   * @param statuses array of statuses to wait for.
152   * @return the state of the job after the wait.
153   */
154  public LauncherStatus waitForStatus(final long waitTime, final LauncherStatus... statuses) {
155
156    final long endTime = System.currentTimeMillis() + waitTime;
157
158    final HashSet<LauncherStatus> statSet = new HashSet<>(statuses.length * 2);
159    Collections.addAll(statSet, statuses);
160    Collections.addAll(statSet, LauncherStatus.FAILED, LauncherStatus.FORCE_CLOSED);
161
162    LOG.log(Level.FINEST, "Wait for status: {0}", statSet);
163    final LauncherStatus finalStatus;
164
165    synchronized (this) {
166      while (!statSet.contains(this.status)) {
167        try {
168          final long delay = endTime - System.currentTimeMillis();
169          if (delay <= 0) {
170            break;
171          }
172          LOG.log(Level.FINE, "Wait for {0} milliSeconds", delay);
173          this.wait(delay);
174        } catch (final InterruptedException ex) {
175          LOG.log(Level.FINE, "Interrupted: {0}", ex);
176        }
177      }
178
179      finalStatus = this.status;
180    }
181
182    LOG.log(Level.FINEST, "Final status: {0}", finalStatus);
183    return finalStatus;
184  }
185
186  /**
187   * Run a job with a waiting timeout after which it will be killed, if it did not complete yet.
188   *
189   * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
190   * @param timeOut timeout on the job.
191   * @return the state of the job after execution.
192   */
193  public LauncherStatus run(final Configuration driverConfig, final long timeOut) {
194
195    final long startTime = System.currentTimeMillis();
196
197    this.reef.submit(driverConfig);
198    this.waitForStatus(timeOut - System.currentTimeMillis() + startTime, LauncherStatus.COMPLETED);
199
200    if (System.currentTimeMillis() - startTime >= timeOut) {
201      LOG.log(Level.WARNING, "The Job timed out.");
202      synchronized (this) {
203        this.status = LauncherStatus.FORCE_CLOSED;
204      }
205    }
206
207    this.reef.close();
208    return this.getStatus();
209  }
210
211  /**
212   * @return the current status of the job.
213   */
214  public synchronized LauncherStatus getStatus() {
215    return this.status;
216  }
217
218  /** Update job status and notify the waiting thread. */
219  public synchronized void setStatusAndNotify(final LauncherStatus newStatus) {
220    LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus});
221    this.status = newStatus;
222    this.notify();
223  }
224
225  @Override
226  public String toString() {
227    return String.format("DriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status);
228  }
229
230  /**
231   * Job driver notifies us that the job has been submitted to the Resource Manager.
232   */
233  public final class SubmittedJobHandler implements EventHandler<SubmittedJob> {
234    @Override
235    public void onNext(final SubmittedJob job) {
236      LOG.log(Level.INFO, "REEF job submitted: {0}.", job.getId());
237      jobId = job.getId();
238      setStatusAndNotify(LauncherStatus.SUBMITTED);
239    }
240  }
241
242  /**
243   * Job driver notifies us that the job is running.
244   */
245  public final class RunningJobHandler implements EventHandler<RunningJob> {
246    @Override
247    public void onNext(final RunningJob job) {
248      LOG.log(Level.INFO, "The Job {0} is running.", job.getId());
249      theJob = job;
250      setStatusAndNotify(LauncherStatus.RUNNING);
251    }
252  }
253
254  /**
255   * Job driver notifies us that the job had failed.
256   */
257  public final class FailedJobHandler implements EventHandler<FailedJob> {
258    @Override
259    public void onNext(final FailedJob job) {
260      final Optional<Throwable> ex = job.getReason();
261      LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex);
262      theJob = null;
263      setStatusAndNotify(LauncherStatus.failed(ex));
264    }
265  }
266
267  /**
268   * Job driver notifies us that the job had completed successfully.
269   */
270  public final class CompletedJobHandler implements EventHandler<CompletedJob> {
271    @Override
272    public void onNext(final CompletedJob job) {
273      LOG.log(Level.INFO, "The Job {0} is done.", job.getId());
274      theJob = null;
275      setStatusAndNotify(LauncherStatus.COMPLETED);
276    }
277  }
278
279  /**
280   * Handler an error in the job driver.
281   */
282  public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
283    @Override
284    public void onNext(final FailedRuntime error) {
285      LOG.log(Level.SEVERE, "Received a resource manager error", error.getReason());
286      theJob = null;
287      setStatusAndNotify(LauncherStatus.failed(error.getReason()));
288    }
289  }
290}