001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.data.loading;
020
021import org.apache.hadoop.mapred.TextInputFormat;
022import org.apache.reef.annotations.audience.ClientSide;
023import org.apache.reef.client.DriverConfiguration;
024import org.apache.reef.client.DriverLauncher;
025import org.apache.reef.client.LauncherStatus;
026import org.apache.reef.driver.evaluator.EvaluatorRequest;
027import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
028import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
029import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
030import org.apache.reef.tang.Configuration;
031import org.apache.reef.tang.Injector;
032import org.apache.reef.tang.JavaConfigurationBuilder;
033import org.apache.reef.tang.Tang;
034import org.apache.reef.tang.annotations.Name;
035import org.apache.reef.tang.annotations.NamedParameter;
036import org.apache.reef.tang.exceptions.BindException;
037import org.apache.reef.tang.exceptions.InjectionException;
038import org.apache.reef.tang.formats.CommandLine;
039import org.apache.reef.util.EnvironmentUtils;
040
041import java.io.IOException;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * Client for the data loading demo app.
047 */
048@ClientSide
049public final class DataLoadingREEF {
050
051  private static final Logger LOG = Logger.getLogger(DataLoadingREEF.class.getName());
052
053  /**
054   * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently.
055   */
056  private static final int MAX_NUMBER_OF_EVALUATORS = 16;
057
058  private static final int NUM_SPLITS = 6;
059  private static final int NUM_COMPUTE_EVALUATORS = 2;
060
061  public static void main(final String[] args)
062      throws InjectionException, BindException, IOException {
063
064    final Tang tang = Tang.Factory.getTang();
065
066    final JavaConfigurationBuilder cb = tang.newConfigurationBuilder();
067
068    new CommandLine(cb)
069        .registerShortNameOfClass(Local.class)
070        .registerShortNameOfClass(TimeOut.class)
071        .registerShortNameOfClass(DataLoadingREEF.InputDir.class)
072        .processCommandLine(args);
073
074    final Injector injector = tang.newInjector(cb.build());
075
076    final boolean isLocal = injector.getNamedInstance(Local.class);
077    final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
078    final String inputDir = injector.getNamedInstance(DataLoadingREEF.InputDir.class);
079
080    final Configuration runtimeConfiguration;
081    if (isLocal) {
082      LOG.log(Level.INFO, "Running Data Loading demo on the local runtime");
083      runtimeConfiguration = LocalRuntimeConfiguration.CONF
084          .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS)
085          .build();
086    } else {
087      LOG.log(Level.INFO, "Running Data Loading demo on YARN");
088      runtimeConfiguration = YarnClientConfiguration.CONF.build();
089    }
090
091    final EvaluatorRequest computeRequest = EvaluatorRequest.newBuilder()
092        .setNumber(NUM_COMPUTE_EVALUATORS)
093        .setMemory(512)
094        .setNumberOfCores(1)
095        .build();
096
097    final EvaluatorRequest dataRequest = EvaluatorRequest.newBuilder()
098        .setMemory(512)
099        .setNumberOfCores(1)
100        .build();
101
102    final Configuration dataLoadConfiguration = new DataLoadingRequestBuilder()
103        .setInputFormatClass(TextInputFormat.class)
104        .setInputPath(inputDir)
105        .setNumberOfDesiredSplits(NUM_SPLITS)
106        .addComputeRequest(computeRequest)
107        .addDataRequest(dataRequest)
108        .setDriverConfigurationModule(DriverConfiguration.CONF
109            .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(LineCounter.class))
110            .set(DriverConfiguration.ON_CONTEXT_ACTIVE, LineCounter.ContextActiveHandler.class)
111            .set(DriverConfiguration.ON_TASK_COMPLETED, LineCounter.TaskCompletedHandler.class)
112            .set(DriverConfiguration.DRIVER_IDENTIFIER, "DataLoadingREEF"))
113        .build();
114
115    final LauncherStatus state =
116        DriverLauncher.getLauncher(runtimeConfiguration).run(dataLoadConfiguration, jobTimeout);
117
118    LOG.log(Level.INFO, "REEF job completed: {0}", state);
119  }
120
121  /**
122   * Command line parameter = true to run locally, or false to run on YARN.
123   */
124  @NamedParameter(doc = "Whether or not to run on the local runtime",
125      short_name = "local", default_value = "true")
126  public static final class Local implements Name<Boolean> {
127  }
128
129  /**
130   * Number of minutes before timeout.
131   */
132  @NamedParameter(doc = "Number of minutes before timeout",
133      short_name = "timeout", default_value = "2")
134  public static final class TimeOut implements Name<Integer> {
135  }
136
137  /**
138   * Input path.
139   */
140  @NamedParameter(short_name = "input")
141  public static final class InputDir implements Name<String> {
142  }
143
144  /**
145   * Empty private constructor to prohibit instantiation of utility class.
146   */
147  private DataLoadingREEF() {
148  }
149}