This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.suspend;
020
021import org.apache.reef.client.ClientConfiguration;
022import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
023import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
024import org.apache.reef.tang.*;
025import org.apache.reef.tang.annotations.Name;
026import org.apache.reef.tang.annotations.NamedParameter;
027import org.apache.reef.tang.exceptions.BindException;
028import org.apache.reef.tang.exceptions.InjectionException;
029import org.apache.reef.tang.formats.CommandLine;
030
031import java.io.IOException;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Suspend/Resume example - main class.
037 */
038public final class Launch {
039
040  /**
041   * Standard Java logger.
042   */
043  private static final Logger LOG = Logger.getLogger(Launch.class.getName());
044  /**
045   * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently.
046   */
047  private static final int MAX_NUMBER_OF_EVALUATORS = 4;
048
049  /**
050   * This class should not be instantiated.
051   */
052  private Launch() {
053    throw new RuntimeException("Do not instantiate this class!");
054  }
055
056  /**
057   * @param args command line arguments, as passed to main()
058   * @return Configuration object.
059   */
060  private static Configuration parseCommandLine(final String[] args)
061      throws IOException, BindException {
062    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
063    final CommandLine cl = new CommandLine(confBuilder);
064    cl.registerShortNameOfClass(Local.class);
065    cl.registerShortNameOfClass(NumCycles.class);
066    cl.registerShortNameOfClass(Delay.class);
067    cl.registerShortNameOfClass(SuspendClientControl.Port.class);
068    cl.processCommandLine(args);
069    return confBuilder.build();
070  }
071
072  private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
073      throws InjectionException, BindException {
074    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
075    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
076    cb.bindNamedParameter(NumCycles.class, String.valueOf(injector.getNamedInstance(NumCycles.class)));
077    cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class)));
078    cb.bindNamedParameter(SuspendClientControl.Port.class,
079        String.valueOf(injector.getNamedInstance(SuspendClientControl.Port.class)));
080    return cb.build();
081  }
082
083  /**
084   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
085   *
086   * @param args Command line arguments, as passed into main().
087   * @return (immutable) TANG Configuration object.
088   * @throws BindException      if configuration commandLineInjector fails.
089   * @throws InjectionException if configuration commandLineInjector fails.
090   * @throws IOException        error reading the configuration.
091   */
092  private static Configuration getClientConfiguration(final String[] args)
093      throws BindException, InjectionException, IOException {
094    final Configuration commandLineConf = parseCommandLine(args);
095
096    final Configuration clientConfiguration = ClientConfiguration.CONF
097        .set(ClientConfiguration.ON_JOB_RUNNING, SuspendClient.RunningJobHandler.class)
098        .set(ClientConfiguration.ON_JOB_FAILED, SuspendClient.FailedJobHandler.class)
099        .set(ClientConfiguration.ON_JOB_COMPLETED, SuspendClient.CompletedJobHandler.class)
100        .set(ClientConfiguration.ON_RUNTIME_ERROR, SuspendClient.RuntimeErrorHandler.class)
101        .build();
102
103    final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
104    final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
105    final Configuration runtimeConfiguration;
106    if (isLocal) {
107      LOG.log(Level.INFO, "Running on the local runtime");
108      runtimeConfiguration = LocalRuntimeConfiguration.CONF
109          .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS)
110          .build();
111    } else {
112      LOG.log(Level.INFO, "Running on YARN");
113      runtimeConfiguration = YarnClientConfiguration.CONF.build();
114    }
115
116    return Configurations.merge(
117        runtimeConfiguration, clientConfiguration, cloneCommandLineConfiguration(commandLineConf));
118  }
119
120  /**
121   * Main method that runs the example.
122   *
123   * @param args command line parameters.
124   */
125  public static void main(final String[] args) {
126    try {
127      final Configuration config = getClientConfiguration(args);
128
129      LOG.log(Level.INFO, "Configuration:\n--\n{0}--", Configurations.toString(config, true));
130
131      final Injector injector = Tang.Factory.getTang().newInjector(config);
132      final SuspendClient client = injector.getInstance(SuspendClient.class);
133
134      client.submit();
135      client.waitForCompletion();
136      LOG.info("Done!");
137
138    } catch (final BindException | IOException | InjectionException ex) {
139      LOG.log(Level.SEVERE, "Cannot launch: configuration error", ex);
140    } catch (final Exception ex) {
141      LOG.log(Level.SEVERE, "Cleanup error", ex);
142    }
143  }
144
145  /**
146   * Command line parameter = true to run locally, or false to run on YARN.
147   */
148  @NamedParameter(doc = "Whether or not to run on the local runtime",
149      short_name = "local", default_value = "true")
150  public static final class Local implements Name<Boolean> {
151  }
152
153  /**
154   * Command line parameter: number of iterations to run.
155   */
156  @NamedParameter(doc = "Number of iterations to run", short_name = "cycles", default_value = "20")
157  public static final class NumCycles implements Name<Integer> {
158  }
159
160  /**
161   * Command line parameter: delay in seconds for each cycle.
162   */
163  @NamedParameter(doc = "Delay in seconds between the cycles", short_name = "delay", default_value = "1")
164  public static final class Delay implements Name<Integer> {
165  }
166}