This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.data.loading;
020
021import org.apache.hadoop.mapred.TextInputFormat;
022import org.apache.reef.annotations.audience.ClientSide;
023import org.apache.reef.client.DriverConfiguration;
024import org.apache.reef.client.DriverLauncher;
025import org.apache.reef.client.LauncherStatus;
026import org.apache.reef.driver.evaluator.EvaluatorRequest;
027import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
028import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
029import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
030import org.apache.reef.tang.Configuration;
031import org.apache.reef.tang.Injector;
032import org.apache.reef.tang.JavaConfigurationBuilder;
033import org.apache.reef.tang.Tang;
034import org.apache.reef.tang.annotations.Name;
035import org.apache.reef.tang.annotations.NamedParameter;
036import org.apache.reef.tang.exceptions.BindException;
037import org.apache.reef.tang.exceptions.InjectionException;
038import org.apache.reef.tang.formats.CommandLine;
039import org.apache.reef.util.EnvironmentUtils;
040
041import java.io.IOException;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * Client for the data loading demo app
047 */
048@ClientSide
049public class DataLoadingREEF {
050
051  private static final Logger LOG = Logger.getLogger(DataLoadingREEF.class.getName());
052
053  private static final int NUM_LOCAL_THREADS = 16;
054  private static final int NUM_SPLITS = 6;
055  private static final int NUM_COMPUTE_EVALUATORS = 2;
056
057  public static void main(final String[] args)
058      throws InjectionException, BindException, IOException {
059
060    final Tang tang = Tang.Factory.getTang();
061
062    final JavaConfigurationBuilder cb = tang.newConfigurationBuilder();
063
064    new CommandLine(cb)
065        .registerShortNameOfClass(Local.class)
066        .registerShortNameOfClass(TimeOut.class)
067        .registerShortNameOfClass(DataLoadingREEF.InputDir.class)
068        .processCommandLine(args);
069
070    final Injector injector = tang.newInjector(cb.build());
071
072    final boolean isLocal = injector.getNamedInstance(Local.class);
073    final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
074    final String inputDir = injector.getNamedInstance(DataLoadingREEF.InputDir.class);
075
076    final Configuration runtimeConfiguration;
077    if (isLocal) {
078      LOG.log(Level.INFO, "Running Data Loading demo on the local runtime");
079      runtimeConfiguration = LocalRuntimeConfiguration.CONF
080          .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
081          .build();
082    } else {
083      LOG.log(Level.INFO, "Running Data Loading demo on YARN");
084      runtimeConfiguration = YarnClientConfiguration.CONF.build();
085    }
086
087    final EvaluatorRequest computeRequest = EvaluatorRequest.newBuilder()
088        .setNumber(NUM_COMPUTE_EVALUATORS)
089        .setMemory(512)
090        .setNumberOfCores(1)
091        .build();
092
093    final Configuration dataLoadConfiguration = new DataLoadingRequestBuilder()
094        .setMemoryMB(1024)
095        .setInputFormatClass(TextInputFormat.class)
096        .setInputPath(inputDir)
097        .setNumberOfDesiredSplits(NUM_SPLITS)
098        .setComputeRequest(computeRequest)
099        .setDriverConfigurationModule(DriverConfiguration.CONF
100            .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(LineCounter.class))
101            .set(DriverConfiguration.ON_CONTEXT_ACTIVE, LineCounter.ContextActiveHandler.class)
102            .set(DriverConfiguration.ON_TASK_COMPLETED, LineCounter.TaskCompletedHandler.class)
103            .set(DriverConfiguration.DRIVER_IDENTIFIER, "DataLoadingREEF"))
104        .build();
105
106    final LauncherStatus state =
107        DriverLauncher.getLauncher(runtimeConfiguration).run(dataLoadConfiguration, jobTimeout);
108
109    LOG.log(Level.INFO, "REEF job completed: {0}", state);
110  }
111
112  /**
113   * Command line parameter = true to run locally, or false to run on YARN.
114   */
115  @NamedParameter(doc = "Whether or not to run on the local runtime",
116      short_name = "local", default_value = "true")
117  public static final class Local implements Name<Boolean> {
118  }
119
120  @NamedParameter(doc = "Number of minutes before timeout",
121      short_name = "timeout", default_value = "2")
122  public static final class TimeOut implements Name<Integer> {
123  }
124
125  @NamedParameter(short_name = "input")
126  public static final class InputDir implements Name<String> {
127  }
128}