This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.client;
020
021import org.apache.reef.annotations.Provided;
022import org.apache.reef.annotations.audience.ClientSide;
023import org.apache.reef.annotations.audience.Public;
024import org.apache.reef.tang.Configuration;
025import org.apache.reef.tang.Tang;
026import org.apache.reef.tang.annotations.Unit;
027import org.apache.reef.tang.exceptions.BindException;
028import org.apache.reef.tang.exceptions.InjectionException;
029import org.apache.reef.util.Optional;
030import org.apache.reef.wake.EventHandler;
031
032import javax.inject.Inject;
033import java.util.logging.Level;
034import java.util.logging.Logger;
035
036/**
037 * A launcher for REEF Drivers.
038 * <p>
039 * It can be instantiated using a configuration that can create a REEF instance.
040 * For example, the local resourcemanager and the YARN resourcemanager can do this.
041 * <p>
042 * See {@link org.apache.reef.examples.hello} package for a demo use case.
043 */
044@Public
045@Provided
046@ClientSide
047@Unit
048public final class DriverLauncher {
049
050  private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName());
051  private final REEF reef;
052  private LauncherStatus status = LauncherStatus.INIT;
053  private RunningJob theJob = null;
054
055  @Inject
056  private DriverLauncher(final REEF reef) {
057    this.reef = reef;
058  }
059
060  /**
061   * Instantiate a launcher for the given Configuration.
062   *
063   * @param runtimeConfiguration the resourcemanager configuration to be used
064   * @return a DriverLauncher based on the given resourcemanager configuration
065   * @throws BindException      on configuration errors
066   * @throws InjectionException on configuration errors
067   */
068  public static DriverLauncher getLauncher(
069      final Configuration runtimeConfiguration) throws BindException, InjectionException {
070
071    final Configuration clientConfiguration = ClientConfiguration.CONF
072        .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class)
073        .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class)
074        .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class)
075        .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class)
076        .build();
077
078    return Tang.Factory.getTang()
079        .newInjector(runtimeConfiguration, clientConfiguration)
080        .getInstance(DriverLauncher.class);
081  }
082
083  /**
084   * Kills the running job.
085   */
086  public synchronized void close() {
087    if (this.status.isRunning()) {
088      this.status = LauncherStatus.FORCE_CLOSED;
089    }
090    if (null != this.theJob) {
091      this.theJob.close();
092    }
093    this.notify();
094  }
095
096  /**
097   * Run a job. Waits indefinitely for the job to complete.
098   *
099   * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
100   * @return the state of the job after execution.
101   */
102  public LauncherStatus run(final Configuration driverConfig) {
103    this.reef.submit(driverConfig);
104    synchronized (this) {
105      while (!this.status.isDone()) {
106        try {
107          LOG.log(Level.FINE, "Wait indefinitely");
108          this.wait();
109        } catch (final InterruptedException ex) {
110          LOG.log(Level.FINE, "Interrupted: {0}", ex);
111        }
112      }
113    }
114    this.reef.close();
115    synchronized (this) {
116      return this.status;
117    }
118  }
119
120  /**
121   * Run a job with a waiting timeout after which it will be killed, if it did not complete yet.
122   *
123   * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
124   * @param timeOut      timeout on the job.
125   * @return the state of the job after execution.
126   */
127  public LauncherStatus run(final Configuration driverConfig, final long timeOut) {
128    final long endTime = System.currentTimeMillis() + timeOut;
129    this.reef.submit(driverConfig);
130    synchronized (this) {
131      while (!this.status.isDone()) {
132        try {
133          final long waitTime = endTime - System.currentTimeMillis();
134          if (waitTime <= 0) {
135            break;
136          }
137          LOG.log(Level.FINE, "Wait for {0} milliSeconds", waitTime);
138          this.wait(waitTime);
139        } catch (final InterruptedException ex) {
140          LOG.log(Level.FINE, "Interrupted: {0}", ex);
141        }
142      }
143      if (System.currentTimeMillis() >= endTime) {
144        LOG.log(Level.WARNING, "The Job timed out.");
145        this.status = LauncherStatus.FORCE_CLOSED;
146      }
147    }
148
149    this.reef.close();
150    synchronized (this) {
151      return this.status;
152    }
153  }
154
155  /**
156   * @return the current status of the job.
157   */
158  public LauncherStatus getStatus() {
159    synchronized (this) {
160      return this.status;
161    }
162  }
163
164  /**
165   * Update job status and notify the waiting thread.
166   */
167  @SuppressWarnings("checkstyle:hiddenfield")
168  public synchronized void setStatusAndNotify(final LauncherStatus status) {
169    LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[]{this.status, status});
170    this.status = status;
171    this.notify();
172  }
173
174  @Override
175  public String toString() {
176    return this.status.toString();
177  }
178
179  /**
180   * Job driver notifies us that the job is running.
181   */
182  public final class RunningJobHandler implements EventHandler<RunningJob> {
183    @Override
184    public void onNext(final RunningJob job) {
185      LOG.log(Level.INFO, "The Job {0} is running.", job.getId());
186      theJob = job;
187      setStatusAndNotify(LauncherStatus.RUNNING);
188    }
189  }
190
191  /**
192   * Job driver notifies us that the job had failed.
193   */
194  public final class FailedJobHandler implements EventHandler<FailedJob> {
195    @Override
196    public void onNext(final FailedJob job) {
197      final Optional<Throwable> ex = job.getReason();
198      LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex);
199      theJob = null;
200      setStatusAndNotify(LauncherStatus.failed(ex));
201    }
202  }
203
204  /**
205   * Job driver notifies us that the job had completed successfully.
206   */
207  public final class CompletedJobHandler implements EventHandler<CompletedJob> {
208    @Override
209    public void onNext(final CompletedJob job) {
210      LOG.log(Level.INFO, "The Job {0} is done.", job.getId());
211      theJob = null;
212      setStatusAndNotify(LauncherStatus.COMPLETED);
213    }
214  }
215
216  /**
217   * Handler an error in the job driver.
218   */
219  public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
220    @Override
221    public void onNext(final FailedRuntime error) {
222      LOG.log(Level.SEVERE, "Received a resourcemanager error", error.getReason());
223      theJob = null;
224      setStatusAndNotify(LauncherStatus.failed(error.getReason()));
225    }
226  }
227}