This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.suspend;
020
021import org.apache.reef.client.ClientConfiguration;
022import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
023import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
024import org.apache.reef.tang.Configuration;
025import org.apache.reef.tang.Injector;
026import org.apache.reef.tang.JavaConfigurationBuilder;
027import org.apache.reef.tang.Tang;
028import org.apache.reef.tang.annotations.Name;
029import org.apache.reef.tang.annotations.NamedParameter;
030import org.apache.reef.tang.exceptions.BindException;
031import org.apache.reef.tang.exceptions.InjectionException;
032import org.apache.reef.tang.formats.AvroConfigurationSerializer;
033import org.apache.reef.tang.formats.CommandLine;
034
035import java.io.IOException;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039/**
040 * Suspend/Resume example - main class.
041 */
042public final class Launch {
043
044  /**
045   * Standard Java logger
046   */
047  private static final Logger LOG = Logger.getLogger(Launch.class.getName());
048  /**
049   * Number of REEF worker threads in local mode.
050   */
051  private static final int NUM_LOCAL_THREADS = 4;
052
053  /**
054   * This class should not be instantiated.
055   */
056  private Launch() {
057    throw new RuntimeException("Do not instantiate this class!");
058  }
059
060  /**
061   * @param args command line arguments, as passed to main()
062   * @return Configuration object.
063   */
064  private static Configuration parseCommandLine(final String[] args)
065      throws IOException, BindException {
066    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
067    final CommandLine cl = new CommandLine(confBuilder);
068    cl.registerShortNameOfClass(Local.class);
069    cl.registerShortNameOfClass(NumCycles.class);
070    cl.registerShortNameOfClass(Delay.class);
071    cl.registerShortNameOfClass(SuspendClientControl.Port.class);
072    cl.processCommandLine(args);
073    return confBuilder.build();
074  }
075
076  private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
077      throws InjectionException, BindException {
078    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
079    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
080    cb.bindNamedParameter(NumCycles.class, String.valueOf(injector.getNamedInstance(NumCycles.class)));
081    cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class)));
082    cb.bindNamedParameter(SuspendClientControl.Port.class,
083        String.valueOf(injector.getNamedInstance(SuspendClientControl.Port.class)));
084    return cb.build();
085  }
086
087  /**
088   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
089   *
090   * @param args Command line arguments, as passed into main().
091   * @return (immutable) TANG Configuration object.
092   * @throws BindException      if configuration commandLineInjector fails.
093   * @throws InjectionException if configuration commandLineInjector fails.
094   * @throws IOException        error reading the configuration.
095   */
096  private static Configuration getClientConfiguration(final String[] args)
097      throws BindException, InjectionException, IOException {
098    final Configuration commandLineConf = parseCommandLine(args);
099
100    final Configuration clientConfiguration = ClientConfiguration.CONF
101        .set(ClientConfiguration.ON_JOB_RUNNING, SuspendClient.RunningJobHandler.class)
102        .set(ClientConfiguration.ON_JOB_FAILED, SuspendClient.FailedJobHandler.class)
103        .set(ClientConfiguration.ON_JOB_COMPLETED, SuspendClient.CompletedJobHandler.class)
104        .set(ClientConfiguration.ON_RUNTIME_ERROR, SuspendClient.RuntimeErrorHandler.class)
105        .build();
106
107    // TODO: Remove the injector, have stuff injected.
108    final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
109    final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
110    final Configuration runtimeConfiguration;
111    if (isLocal) {
112      LOG.log(Level.INFO, "Running on the local runtime");
113      runtimeConfiguration = LocalRuntimeConfiguration.CONF
114          .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
115          .build();
116    } else {
117      LOG.log(Level.INFO, "Running on YARN");
118      runtimeConfiguration = YarnClientConfiguration.CONF.build();
119    }
120
121    return Tang.Factory.getTang()
122        .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
123            cloneCommandLineConfiguration(commandLineConf))
124        .build();
125  }
126
127  /**
128   * Main method that runs the example.
129   *
130   * @param args command line parameters.
131   */
132  public static void main(final String[] args) {
133    try {
134      final Configuration config = getClientConfiguration(args);
135
136      LOG.log(Level.INFO, "Configuration:\n--\n{0}--",
137          new AvroConfigurationSerializer().toString(config));
138
139      final Injector injector = Tang.Factory.getTang().newInjector(config);
140      final SuspendClient client = injector.getInstance(SuspendClient.class);
141
142      client.submit();
143      client.waitForCompletion();
144      LOG.info("Done!");
145
146    } catch (final BindException | IOException | InjectionException ex) {
147      LOG.log(Level.SEVERE, "Cannot launch: configuration error", ex);
148    } catch (final Exception ex) {
149      LOG.log(Level.SEVERE, "Cleanup error", ex);
150    }
151  }
152
153  /**
154   * Command line parameter = true to run locally, or false to run on YARN.
155   */
156  @NamedParameter(doc = "Whether or not to run on the local runtime",
157      short_name = "local", default_value = "true")
158  public static final class Local implements Name<Boolean> {
159  }
160
161  /**
162   * Command line parameter: number of iterations to run.
163   */
164  @NamedParameter(doc = "Number of iterations to run", short_name = "cycles", default_value = "20")
165  public static final class NumCycles implements Name<Integer> {
166  }
167
168  /**
169   * Command line parameter: delay in seconds for each cycle.
170   */
171  @NamedParameter(doc = "Delay in seconds between the cycles", short_name = "delay", default_value = "1")
172  public static final class Delay implements Name<Integer> {
173  }
174}