001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.data.loading; 020 021import org.apache.hadoop.mapred.TextInputFormat; 022import org.apache.reef.annotations.audience.ClientSide; 023import org.apache.reef.client.DriverConfiguration; 024import org.apache.reef.client.DriverLauncher; 025import org.apache.reef.client.LauncherStatus; 026import org.apache.reef.driver.evaluator.EvaluatorRequest; 027import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder; 028import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 029import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 030import org.apache.reef.tang.Configuration; 031import org.apache.reef.tang.Injector; 032import org.apache.reef.tang.JavaConfigurationBuilder; 033import org.apache.reef.tang.Tang; 034import org.apache.reef.tang.annotations.Name; 035import org.apache.reef.tang.annotations.NamedParameter; 036import org.apache.reef.tang.exceptions.BindException; 037import org.apache.reef.tang.exceptions.InjectionException; 038import org.apache.reef.tang.formats.CommandLine; 039import org.apache.reef.util.EnvironmentUtils; 040 041import java.io.IOException; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Client for the data loading demo app. 047 */ 048@ClientSide 049public final class DataLoadingREEF { 050 051 private static final Logger LOG = Logger.getLogger(DataLoadingREEF.class.getName()); 052 053 /** 054 * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. 055 */ 056 private static final int MAX_NUMBER_OF_EVALUATORS = 16; 057 058 private static final int NUM_SPLITS = 6; 059 private static final int NUM_COMPUTE_EVALUATORS = 2; 060 061 public static void main(final String[] args) 062 throws InjectionException, BindException, IOException { 063 064 final Tang tang = Tang.Factory.getTang(); 065 066 final JavaConfigurationBuilder cb = tang.newConfigurationBuilder(); 067 068 new CommandLine(cb) 069 .registerShortNameOfClass(Local.class) 070 .registerShortNameOfClass(TimeOut.class) 071 .registerShortNameOfClass(DataLoadingREEF.InputDir.class) 072 .processCommandLine(args); 073 074 final Injector injector = tang.newInjector(cb.build()); 075 076 final boolean isLocal = injector.getNamedInstance(Local.class); 077 final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000; 078 final String inputDir = injector.getNamedInstance(DataLoadingREEF.InputDir.class); 079 080 final Configuration runtimeConfiguration; 081 if (isLocal) { 082 LOG.log(Level.INFO, "Running Data Loading demo on the local runtime"); 083 runtimeConfiguration = LocalRuntimeConfiguration.CONF 084 .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) 085 .build(); 086 } else { 087 LOG.log(Level.INFO, "Running Data Loading demo on YARN"); 088 runtimeConfiguration = YarnClientConfiguration.CONF.build(); 089 } 090 091 final EvaluatorRequest computeRequest = EvaluatorRequest.newBuilder() 092 .setNumber(NUM_COMPUTE_EVALUATORS) 093 .setMemory(512) 094 .setNumberOfCores(1) 095 .build(); 096 097 final EvaluatorRequest dataRequest = EvaluatorRequest.newBuilder() 098 .setMemory(512) 099 .setNumberOfCores(1) 100 .build(); 101 102 final Configuration dataLoadConfiguration = new DataLoadingRequestBuilder() 103 .setInputFormatClass(TextInputFormat.class) 104 .setInputPath(inputDir) 105 .setNumberOfDesiredSplits(NUM_SPLITS) 106 .addComputeRequest(computeRequest) 107 .addDataRequest(dataRequest) 108 .setDriverConfigurationModule(DriverConfiguration.CONF 109 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(LineCounter.class)) 110 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, LineCounter.ContextActiveHandler.class) 111 .set(DriverConfiguration.ON_TASK_COMPLETED, LineCounter.TaskCompletedHandler.class) 112 .set(DriverConfiguration.DRIVER_IDENTIFIER, "DataLoadingREEF")) 113 .build(); 114 115 final LauncherStatus state = 116 DriverLauncher.getLauncher(runtimeConfiguration).run(dataLoadConfiguration, jobTimeout); 117 118 LOG.log(Level.INFO, "REEF job completed: {0}", state); 119 } 120 121 /** 122 * Command line parameter = true to run locally, or false to run on YARN. 123 */ 124 @NamedParameter(doc = "Whether or not to run on the local runtime", 125 short_name = "local", default_value = "true") 126 public static final class Local implements Name<Boolean> { 127 } 128 129 /** 130 * Number of minutes before timeout. 131 */ 132 @NamedParameter(doc = "Number of minutes before timeout", 133 short_name = "timeout", default_value = "2") 134 public static final class TimeOut implements Name<Integer> { 135 } 136 137 /** 138 * Input path. 139 */ 140 @NamedParameter(short_name = "input") 141 public static final class InputDir implements Name<String> { 142 } 143 144 /** 145 * Empty private constructor to prohibit instantiation of utility class. 146 */ 147 private DataLoadingREEF() { 148 } 149}