This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.suspend;
020
021import org.apache.reef.client.ClientConfiguration;
022import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
023import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
024import org.apache.reef.tang.Configuration;
025import org.apache.reef.tang.Injector;
026import org.apache.reef.tang.JavaConfigurationBuilder;
027import org.apache.reef.tang.Tang;
028import org.apache.reef.tang.annotations.Name;
029import org.apache.reef.tang.annotations.NamedParameter;
030import org.apache.reef.tang.exceptions.BindException;
031import org.apache.reef.tang.exceptions.InjectionException;
032import org.apache.reef.tang.formats.AvroConfigurationSerializer;
033import org.apache.reef.tang.formats.CommandLine;
034
035import java.io.IOException;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039/**
040 * Suspend/Resume example - main class.
041 */
042public final class Launch {
043
044  /**
045   * Standard Java logger.
046   */
047  private static final Logger LOG = Logger.getLogger(Launch.class.getName());
048  /**
049   * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently.
050   */
051  private static final int MAX_NUMBER_OF_EVALUATORS = 4;
052
053  /**
054   * This class should not be instantiated.
055   */
056  private Launch() {
057    throw new RuntimeException("Do not instantiate this class!");
058  }
059
060  /**
061   * @param args command line arguments, as passed to main()
062   * @return Configuration object.
063   */
064  private static Configuration parseCommandLine(final String[] args)
065      throws IOException, BindException {
066    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
067    final CommandLine cl = new CommandLine(confBuilder);
068    cl.registerShortNameOfClass(Local.class);
069    cl.registerShortNameOfClass(NumCycles.class);
070    cl.registerShortNameOfClass(Delay.class);
071    cl.registerShortNameOfClass(SuspendClientControl.Port.class);
072    cl.processCommandLine(args);
073    return confBuilder.build();
074  }
075
076  private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
077      throws InjectionException, BindException {
078    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
079    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
080    cb.bindNamedParameter(NumCycles.class, String.valueOf(injector.getNamedInstance(NumCycles.class)));
081    cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class)));
082    cb.bindNamedParameter(SuspendClientControl.Port.class,
083        String.valueOf(injector.getNamedInstance(SuspendClientControl.Port.class)));
084    return cb.build();
085  }
086
087  /**
088   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
089   *
090   * @param args Command line arguments, as passed into main().
091   * @return (immutable) TANG Configuration object.
092   * @throws BindException      if configuration commandLineInjector fails.
093   * @throws InjectionException if configuration commandLineInjector fails.
094   * @throws IOException        error reading the configuration.
095   */
096  private static Configuration getClientConfiguration(final String[] args)
097      throws BindException, InjectionException, IOException {
098    final Configuration commandLineConf = parseCommandLine(args);
099
100    final Configuration clientConfiguration = ClientConfiguration.CONF
101        .set(ClientConfiguration.ON_JOB_RUNNING, SuspendClient.RunningJobHandler.class)
102        .set(ClientConfiguration.ON_JOB_FAILED, SuspendClient.FailedJobHandler.class)
103        .set(ClientConfiguration.ON_JOB_COMPLETED, SuspendClient.CompletedJobHandler.class)
104        .set(ClientConfiguration.ON_RUNTIME_ERROR, SuspendClient.RuntimeErrorHandler.class)
105        .build();
106
107    final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
108    final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
109    final Configuration runtimeConfiguration;
110    if (isLocal) {
111      LOG.log(Level.INFO, "Running on the local runtime");
112      runtimeConfiguration = LocalRuntimeConfiguration.CONF
113          .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS)
114          .build();
115    } else {
116      LOG.log(Level.INFO, "Running on YARN");
117      runtimeConfiguration = YarnClientConfiguration.CONF.build();
118    }
119
120    return Tang.Factory.getTang()
121        .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
122            cloneCommandLineConfiguration(commandLineConf))
123        .build();
124  }
125
126  /**
127   * Main method that runs the example.
128   *
129   * @param args command line parameters.
130   */
131  public static void main(final String[] args) {
132    try {
133      final Configuration config = getClientConfiguration(args);
134
135      LOG.log(Level.INFO, "Configuration:\n--\n{0}--",
136          new AvroConfigurationSerializer().toString(config));
137
138      final Injector injector = Tang.Factory.getTang().newInjector(config);
139      final SuspendClient client = injector.getInstance(SuspendClient.class);
140
141      client.submit();
142      client.waitForCompletion();
143      LOG.info("Done!");
144
145    } catch (final BindException | IOException | InjectionException ex) {
146      LOG.log(Level.SEVERE, "Cannot launch: configuration error", ex);
147    } catch (final Exception ex) {
148      LOG.log(Level.SEVERE, "Cleanup error", ex);
149    }
150  }
151
152  /**
153   * Command line parameter = true to run locally, or false to run on YARN.
154   */
155  @NamedParameter(doc = "Whether or not to run on the local runtime",
156      short_name = "local", default_value = "true")
157  public static final class Local implements Name<Boolean> {
158  }
159
160  /**
161   * Command line parameter: number of iterations to run.
162   */
163  @NamedParameter(doc = "Number of iterations to run", short_name = "cycles", default_value = "20")
164  public static final class NumCycles implements Name<Integer> {
165  }
166
167  /**
168   * Command line parameter: delay in seconds for each cycle.
169   */
170  @NamedParameter(doc = "Delay in seconds between the cycles", short_name = "delay", default_value = "1")
171  public static final class Delay implements Name<Integer> {
172  }
173}