This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.pool;
020
021import org.apache.reef.client.DriverConfiguration;
022import org.apache.reef.client.DriverLauncher;
023import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
024import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
025import org.apache.reef.tang.Configuration;
026import org.apache.reef.tang.Injector;
027import org.apache.reef.tang.JavaConfigurationBuilder;
028import org.apache.reef.tang.Tang;
029import org.apache.reef.tang.annotations.Name;
030import org.apache.reef.tang.annotations.NamedParameter;
031import org.apache.reef.tang.exceptions.BindException;
032import org.apache.reef.tang.exceptions.InjectionException;
033import org.apache.reef.tang.formats.AvroConfigurationSerializer;
034import org.apache.reef.tang.formats.CommandLine;
035import org.apache.reef.util.EnvironmentUtils;
036
037import java.io.IOException;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * Pool of Evaluators example - main class.
043 */
044public final class Launch {
045
046  /**
047   * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently.
048   */
049  private static final int MAX_NUMBER_OF_EVALUATORS = 4;
050  /**
051   * Standard Java logger.
052   */
053  private static final Logger LOG = Logger.getLogger(Launch.class.getName());
054
055  /**
056   * This class should not be instantiated.
057   */
058  private Launch() {
059    throw new RuntimeException("Do not instantiate this class!");
060  }
061
062  /**
063   * Parse the command line arguments.
064   *
065   * @param args command line arguments, as passed to main()
066   * @return Configuration object.
067   * @throws BindException configuration error.
068   * @throws IOException   error reading the configuration.
069   */
070  private static Configuration parseCommandLine(final String[] args)
071      throws BindException, IOException {
072    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
073    final CommandLine cl = new CommandLine(confBuilder);
074    cl.registerShortNameOfClass(Local.class);
075    cl.registerShortNameOfClass(Piggyback.class);
076    cl.registerShortNameOfClass(NumEvaluators.class);
077    cl.registerShortNameOfClass(NumTasks.class);
078    cl.registerShortNameOfClass(Delay.class);
079    cl.registerShortNameOfClass(JobId.class);
080    cl.processCommandLine(args);
081    return confBuilder.build();
082  }
083
084  private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
085      throws InjectionException, BindException {
086    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
087    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
088    cb.bindNamedParameter(Piggyback.class, String.valueOf(injector.getNamedInstance(Piggyback.class)));
089    cb.bindNamedParameter(NumEvaluators.class, String.valueOf(injector.getNamedInstance(NumEvaluators.class)));
090    cb.bindNamedParameter(NumTasks.class, String.valueOf(injector.getNamedInstance(NumTasks.class)));
091    cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class)));
092    return cb.build();
093  }
094
095  /**
096   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
097   *
098   * @param commandLineConf Parsed command line arguments, as passed into main().
099   * @return (immutable) TANG Configuration object.
100   * @throws BindException      if configuration commandLineInjector fails.
101   * @throws InjectionException if configuration commandLineInjector fails.
102   */
103  private static Configuration getClientConfiguration(
104      final Configuration commandLineConf, final boolean isLocal)
105      throws BindException, InjectionException {
106    final Configuration runtimeConfiguration;
107    if (isLocal) {
108      LOG.log(Level.FINE, "Running on the local runtime");
109      runtimeConfiguration = LocalRuntimeConfiguration.CONF
110          .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS)
111          .build();
112    } else {
113      LOG.log(Level.FINE, "Running on YARN");
114      runtimeConfiguration = YarnClientConfiguration.CONF.build();
115    }
116    return Tang.Factory.getTang().newConfigurationBuilder(
117        runtimeConfiguration, cloneCommandLineConfiguration(commandLineConf))
118        .build();
119  }
120
121  /**
122   * Main method that launches the REEF job.
123   *
124   * @param args command line parameters.
125   */
126  public static void main(final String[] args) {
127
128    try {
129
130      final Configuration commandLineConf = parseCommandLine(args);
131      final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
132
133      final boolean isLocal = injector.getNamedInstance(Local.class);
134      final int numEvaluators = injector.getNamedInstance(NumEvaluators.class);
135      final int numTasks = injector.getNamedInstance(NumTasks.class);
136      final int delay = injector.getNamedInstance(Delay.class);
137      final int jobNum = injector.getNamedInstance(JobId.class);
138
139      final String jobId = String.format("pool.e_%d.a_%d.d_%d.%d",
140          numEvaluators, numTasks, delay, jobNum < 0 ? System.currentTimeMillis() : jobNum);
141
142      // Timeout: delay + 6 extra seconds per Task per Evaluator + 2 minutes to allocate each Evaluator:
143      final int timeout = numTasks * (delay + 6) * 1000 / numEvaluators + numEvaluators * 120000;
144
145      final Configuration runtimeConfig = getClientConfiguration(commandLineConf, isLocal);
146      LOG.log(Level.INFO, "TIME: Start Client {0} with timeout {1} sec. Configuration:\n--\n{2}--",
147          new Object[]{jobId, timeout / 1000, new AvroConfigurationSerializer().toString(runtimeConfig)});
148
149      final Configuration driverConfig = DriverConfiguration.CONF
150          .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class))
151          .set(DriverConfiguration.DRIVER_IDENTIFIER, jobId)
152          .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
153          .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class)
154          .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
155          .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
156          .set(DriverConfiguration.ON_TASK_RUNNING, JobDriver.RunningTaskHandler.class)
157          .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
158          .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class)
159          .build();
160
161      final Configuration submittedConfiguration = Tang.Factory.getTang()
162          .newConfigurationBuilder(driverConfig, commandLineConf).build();
163      DriverLauncher.getLauncher(runtimeConfig)
164          .run(submittedConfiguration, timeout);
165
166      LOG.log(Level.INFO, "TIME: Stop Client {0}", jobId);
167
168    } catch (final BindException | InjectionException | IOException ex) {
169      LOG.log(Level.SEVERE, "Job configuration error", ex);
170    }
171  }
172
173  /**
174   * Command line parameter: number of Evaluators to request.
175   */
176  @NamedParameter(doc = "Number of evaluators to request", short_name = "evaluators")
177  public static final class NumEvaluators implements Name<Integer> {
178  }
179
180  /**
181   * Command line parameter: number of Tasks to run.
182   */
183  @NamedParameter(doc = "Number of tasks to run", short_name = "tasks")
184  public static final class NumTasks implements Name<Integer> {
185  }
186
187  /**
188   * Command line parameter: number of experiments to run.
189   */
190  @NamedParameter(doc = "Number of seconds to sleep in each task", short_name = "delay")
191  public static final class Delay implements Name<Integer> {
192  }
193
194  /**
195   * Command line parameter = true to submit task and context in one request.
196   */
197  @NamedParameter(doc = "Submit task and context together",
198      short_name = "piggyback", default_value = "true")
199  public static final class Piggyback implements Name<Boolean> {
200  }
201
202  /**
203   * Command line parameter = true to run locally, or false to run on YARN.
204   */
205  @NamedParameter(doc = "Whether or not to run on the local runtime",
206      short_name = "local", default_value = "true")
207  public static final class Local implements Name<Boolean> {
208  }
209
210  /**
211   * Command line parameter = Numeric ID for the job.
212   */
213  @NamedParameter(doc = "Numeric ID for the job", short_name = "id", default_value = "-1")
214  public static final class JobId implements Name<Integer> {
215  }
216}