This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.client;
020
021import org.apache.reef.annotations.Provided;
022import org.apache.reef.annotations.audience.ClientSide;
023import org.apache.reef.annotations.audience.Public;
024import org.apache.reef.tang.Configuration;
025import org.apache.reef.tang.Tang;
026import org.apache.reef.tang.annotations.Unit;
027import org.apache.reef.tang.exceptions.BindException;
028import org.apache.reef.tang.exceptions.InjectionException;
029import org.apache.reef.util.Optional;
030import org.apache.reef.wake.EventHandler;
031
032import javax.inject.Inject;
033import java.util.logging.Level;
034import java.util.logging.Logger;
035
036/**
037 * A launcher for REEF Drivers.
038 * <p/>
039 * It can be instantiated using a configuration that can create a REEF instance.
040 * For example, the local resourcemanager and the YARN resourcemanager can do this.
041 * <p/>
042 * {@see org.apache.reef.examples.hello.HelloREEF} for a demo use case.
043 */
044@Public
045@Provided
046@ClientSide
047@Unit
048public final class DriverLauncher {
049
050  private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName());
051  private final REEF reef;
052  private LauncherStatus status = LauncherStatus.INIT;
053  private RunningJob theJob = null;
054
055  @Inject
056  private DriverLauncher(final REEF reef) {
057    this.reef = reef;
058  }
059
060  /**
061   * Instantiate a launcher for the given Configuration.
062   *
063   * @param runtimeConfiguration the resourcemanager configuration to be used
064   * @return a DriverLauncher based on the given resourcemanager configuration
065   * @throws BindException      on configuration errors
066   * @throws InjectionException on configuration errors
067   */
068  public static DriverLauncher getLauncher(
069      final Configuration runtimeConfiguration) throws BindException, InjectionException {
070
071    final Configuration clientConfiguration = ClientConfiguration.CONF
072        .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class)
073        .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class)
074        .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class)
075        .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class)
076        .build();
077
078    return Tang.Factory.getTang()
079        .newInjector(runtimeConfiguration, clientConfiguration)
080        .getInstance(DriverLauncher.class);
081  }
082
083  /**
084   * Kills the running job.
085   */
086  public synchronized void close() {
087    if (this.status.isRunning()) {
088      this.status = LauncherStatus.FORCE_CLOSED;
089    }
090    if (null != this.theJob) {
091      this.theJob.close();
092    }
093    this.notify();
094  }
095
096  /**
097   * Run a job. Waits indefinitely for the job to complete.
098   *
099   * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
100   * @return the state of the job after execution.
101   */
102  public LauncherStatus run(final Configuration driverConfig) {
103    this.reef.submit(driverConfig);
104    synchronized (this) {
105      while (!this.status.isDone()) {
106        try {
107          LOG.log(Level.FINE, "Wait indefinitely");
108          this.wait();
109        } catch (final InterruptedException ex) {
110          LOG.log(Level.FINE, "Interrupted: {0}", ex);
111        }
112      }
113    }
114    this.reef.close();
115    return this.status;
116  }
117
118  /**
119   * Run a job with a waiting timeout after which it will be killed, if it did not complete yet.
120   *
121   * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
122   * @param timeOut      timeout on the job.
123   * @return the state of the job after execution.
124   */
125  public LauncherStatus run(final Configuration driverConfig, final long timeOut) {
126    final long endTime = System.currentTimeMillis() + timeOut;
127    this.reef.submit(driverConfig);
128    synchronized (this) {
129      while (!this.status.isDone()) {
130        try {
131          final long waitTime = endTime - System.currentTimeMillis();
132          if (waitTime <= 0) {
133            break;
134          }
135          LOG.log(Level.FINE, "Wait for {0} milliSeconds", waitTime);
136          this.wait(waitTime);
137        } catch (final InterruptedException ex) {
138          LOG.log(Level.FINE, "Interrupted: {0}", ex);
139        }
140      }
141      if (System.currentTimeMillis() >= endTime) {
142        LOG.log(Level.WARNING, "The Job timed out.");
143        this.status = LauncherStatus.FORCE_CLOSED;
144      }
145    }
146
147    this.reef.close();
148    return this.status;
149  }
150
151  /**
152   * @return the current status of the job.
153   */
154  public LauncherStatus getStatus() {
155    return this.status;
156  }
157
158  /**
159   * Update job status and notify the waiting thread.
160   */
161  public synchronized void setStatusAndNotify(final LauncherStatus status) {
162    LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[]{this.status, status});
163    this.status = status;
164    this.notify();
165  }
166
167  @Override
168  public String toString() {
169    return this.status.toString();
170  }
171
172  /**
173   * Job driver notifies us that the job is running.
174   */
175  public final class RunningJobHandler implements EventHandler<RunningJob> {
176    @Override
177    public void onNext(final RunningJob job) {
178      LOG.log(Level.INFO, "The Job {0} is running.", job.getId());
179      theJob = job;
180      setStatusAndNotify(LauncherStatus.RUNNING);
181    }
182  }
183
184  /**
185   * Job driver notifies us that the job had failed.
186   */
187  public final class FailedJobHandler implements EventHandler<FailedJob> {
188    @Override
189    public void onNext(final FailedJob job) {
190      final Optional<Throwable> ex = job.getReason();
191      LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex);
192      theJob = null;
193      setStatusAndNotify(LauncherStatus.FAILED(ex));
194    }
195  }
196
197  /**
198   * Job driver notifies us that the job had completed successfully.
199   */
200  public final class CompletedJobHandler implements EventHandler<CompletedJob> {
201    @Override
202    public void onNext(final CompletedJob job) {
203      LOG.log(Level.INFO, "The Job {0} is done.", job.getId());
204      theJob = null;
205      setStatusAndNotify(LauncherStatus.COMPLETED);
206    }
207  }
208
209  /**
210   * Handler an error in the job driver.
211   */
212  public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
213    @Override
214    public void onNext(final FailedRuntime error) {
215      LOG.log(Level.SEVERE, "Received a resourcemanager error", error.getReason());
216      theJob = null;
217      setStatusAndNotify(LauncherStatus.FAILED(error.getReason()));
218    }
219  }
220}