This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.yarn.failure;
020
021import org.apache.reef.client.DriverConfiguration;
022import org.apache.reef.client.DriverLauncher;
023import org.apache.reef.client.LauncherStatus;
024import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
025import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
026import org.apache.reef.tang.Configuration;
027import org.apache.reef.tang.Injector;
028import org.apache.reef.tang.JavaConfigurationBuilder;
029import org.apache.reef.tang.Tang;
030import org.apache.reef.tang.annotations.Name;
031import org.apache.reef.tang.annotations.NamedParameter;
032import org.apache.reef.tang.exceptions.BindException;
033import org.apache.reef.tang.exceptions.InjectionException;
034import org.apache.reef.tang.formats.CommandLine;
035import org.apache.reef.util.EnvironmentUtils;
036
037import java.io.IOException;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041public final class FailureREEF {
042
043  public static final int NUM_LOCAL_THREADS = 16;
044
045  private static final Logger LOG = Logger.getLogger(FailureREEF.class.getName());
046
047  private static Configuration parseCommandLine(final String[] aArgs) {
048    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
049    try {
050      new CommandLine(cb)
051          .registerShortNameOfClass(Local.class)
052          .registerShortNameOfClass(TimeOut.class)
053          .processCommandLine(aArgs);
054      return cb.build();
055    } catch (final BindException | IOException ex) {
056      final String msg = "Unable to parse command line";
057      LOG.log(Level.SEVERE, msg, ex);
058      throw new RuntimeException(msg, ex);
059    }
060  }
061
062  /**
063   * @return (immutable) TANG Configuration object.
064   * @throws BindException      if configuration injector fails.
065   * @throws InjectionException if the Local.class parameter is not injected.
066   */
067  private static Configuration getRunTimeConfiguration(final boolean isLocal) throws BindException {
068
069    final Configuration runtimeConfiguration;
070
071    if (isLocal) {
072      LOG.log(Level.INFO, "Running Failure demo on the local runtime");
073      runtimeConfiguration = LocalRuntimeConfiguration.CONF
074          .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
075          .build();
076    } else {
077      LOG.log(Level.INFO, "Running Failure demo on YARN");
078      runtimeConfiguration = YarnClientConfiguration.CONF.build();
079    }
080
081    return runtimeConfiguration;
082  }
083
084  public static LauncherStatus runFailureReef(
085      final Configuration runtimeConfig, final int timeout) throws InjectionException {
086
087    final Configuration driverConf = DriverConfiguration.CONF
088        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailureDriver.class))
089        .set(DriverConfiguration.DRIVER_IDENTIFIER, "FailureREEF")
090        .set(DriverConfiguration.ON_DRIVER_STARTED, FailureDriver.StartHandler.class)
091        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, FailureDriver.EvaluatorAllocatedHandler.class)
092        .set(DriverConfiguration.ON_EVALUATOR_FAILED, FailureDriver.EvaluatorFailedHandler.class)
093        .build();
094
095    final LauncherStatus state = DriverLauncher.getLauncher(runtimeConfig).run(driverConf, timeout);
096    LOG.log(Level.INFO, "REEF job completed: {0}", state);
097    return state;
098  }
099
100  public static void main(final String[] args) throws InjectionException {
101    final Configuration commandLineConf = parseCommandLine(args);
102    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
103    final boolean isLocal = injector.getNamedInstance(Local.class);
104    final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
105    runFailureReef(getRunTimeConfiguration(isLocal), jobTimeout);
106  }
107
108  /**
109   * Command line parameter = true to run locally, or false to run on YARN.
110   */
111  @NamedParameter(doc = "Whether or not to run on the local runtime",
112      short_name = "local", default_value = "true")
113  public static final class Local implements Name<Boolean> {
114  }
115
116  @NamedParameter(doc = "Number of minutes before timeout",
117      short_name = "timeout", default_value = "2")
118  public static final class TimeOut implements Name<Integer> {
119  }
120}