001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.pool; 020 021import org.apache.reef.client.DriverConfiguration; 022import org.apache.reef.client.DriverLauncher; 023import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 024import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 025import org.apache.reef.tang.*; 026import org.apache.reef.tang.annotations.Name; 027import org.apache.reef.tang.annotations.NamedParameter; 028import org.apache.reef.tang.exceptions.BindException; 029import org.apache.reef.tang.exceptions.InjectionException; 030import org.apache.reef.tang.formats.CommandLine; 031import org.apache.reef.util.EnvironmentUtils; 032 033import java.io.IOException; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037/** 038 * Pool of Evaluators example - main class. 039 */ 040public final class Launch { 041 042 /** 043 * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. 044 */ 045 private static final int MAX_NUMBER_OF_EVALUATORS = 4; 046 /** 047 * Standard Java logger. 048 */ 049 private static final Logger LOG = Logger.getLogger(Launch.class.getName()); 050 051 /** 052 * This class should not be instantiated. 053 */ 054 private Launch() { 055 throw new RuntimeException("Do not instantiate this class!"); 056 } 057 058 /** 059 * Parse the command line arguments. 060 * 061 * @param args command line arguments, as passed to main() 062 * @return Configuration object. 063 * @throws BindException configuration error. 064 * @throws IOException error reading the configuration. 065 */ 066 private static Configuration parseCommandLine(final String[] args) 067 throws BindException, IOException { 068 final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 069 final CommandLine cl = new CommandLine(confBuilder); 070 cl.registerShortNameOfClass(Local.class); 071 cl.registerShortNameOfClass(Piggyback.class); 072 cl.registerShortNameOfClass(NumEvaluators.class); 073 cl.registerShortNameOfClass(NumTasks.class); 074 cl.registerShortNameOfClass(Delay.class); 075 cl.registerShortNameOfClass(JobId.class); 076 cl.processCommandLine(args); 077 return confBuilder.build(); 078 } 079 080 private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf) 081 throws InjectionException, BindException { 082 final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); 083 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 084 cb.bindNamedParameter(Piggyback.class, String.valueOf(injector.getNamedInstance(Piggyback.class))); 085 cb.bindNamedParameter(NumEvaluators.class, String.valueOf(injector.getNamedInstance(NumEvaluators.class))); 086 cb.bindNamedParameter(NumTasks.class, String.valueOf(injector.getNamedInstance(NumTasks.class))); 087 cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class))); 088 return cb.build(); 089 } 090 091 /** 092 * Parse command line arguments and create TANG configuration ready to be submitted to REEF. 093 * 094 * @param commandLineConf Parsed command line arguments, as passed into main(). 095 * @return (immutable) TANG Configuration object. 096 * @throws BindException if configuration commandLineInjector fails. 097 * @throws InjectionException if configuration commandLineInjector fails. 098 */ 099 private static Configuration getClientConfiguration( 100 final Configuration commandLineConf, final boolean isLocal) throws BindException, InjectionException { 101 102 final Configuration runtimeConfiguration; 103 104 if (isLocal) { 105 LOG.log(Level.FINE, "Running on the local runtime"); 106 runtimeConfiguration = LocalRuntimeConfiguration.CONF 107 .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) 108 .build(); 109 } else { 110 LOG.log(Level.FINE, "Running on YARN"); 111 runtimeConfiguration = YarnClientConfiguration.CONF.build(); 112 } 113 114 return Configurations.merge(runtimeConfiguration, cloneCommandLineConfiguration(commandLineConf)); 115 } 116 117 /** 118 * Main method that launches the REEF job. 119 * 120 * @param args command line parameters. 121 */ 122 public static void main(final String[] args) { 123 124 try { 125 126 final Configuration commandLineConf = parseCommandLine(args); 127 final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); 128 129 final boolean isLocal = injector.getNamedInstance(Local.class); 130 final int numEvaluators = injector.getNamedInstance(NumEvaluators.class); 131 final int numTasks = injector.getNamedInstance(NumTasks.class); 132 final int delay = injector.getNamedInstance(Delay.class); 133 final int jobNum = injector.getNamedInstance(JobId.class); 134 135 final String jobId = String.format("pool.e_%d.a_%d.d_%d.%d", 136 numEvaluators, numTasks, delay, jobNum < 0 ? System.currentTimeMillis() : jobNum); 137 138 // Timeout: delay + 6 extra seconds per Task per Evaluator + 2 minutes to allocate each Evaluator: 139 final int timeout = numTasks * (delay + 6) * 1000 / numEvaluators + numEvaluators * 120000; 140 141 final Configuration runtimeConfig = getClientConfiguration(commandLineConf, isLocal); 142 LOG.log(Level.INFO, "TIME: Start Client {0} with timeout {1} sec. Configuration:\n--\n{2}--", 143 new Object[] {jobId, timeout / 1000, Configurations.toString(runtimeConfig, true)}); 144 145 final Configuration driverConfig = DriverConfiguration.CONF 146 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class)) 147 .set(DriverConfiguration.DRIVER_IDENTIFIER, jobId) 148 .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class) 149 .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class) 150 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class) 151 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class) 152 .set(DriverConfiguration.ON_TASK_RUNNING, JobDriver.RunningTaskHandler.class) 153 .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class) 154 .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class) 155 .build(); 156 157 final Configuration submittedConfiguration = Tang.Factory.getTang() 158 .newConfigurationBuilder(driverConfig, commandLineConf).build(); 159 160 DriverLauncher.getLauncher(runtimeConfig).run(submittedConfiguration, timeout); 161 162 LOG.log(Level.INFO, "TIME: Stop Client {0}", jobId); 163 164 } catch (final BindException | InjectionException | IOException ex) { 165 LOG.log(Level.SEVERE, "Job configuration error", ex); 166 } 167 } 168 169 /** 170 * Command line parameter: number of Evaluators to request. 171 */ 172 @NamedParameter(doc = "Number of evaluators to request", short_name = "evaluators") 173 public static final class NumEvaluators implements Name<Integer> { 174 } 175 176 /** 177 * Command line parameter: number of Tasks to run. 178 */ 179 @NamedParameter(doc = "Number of tasks to run", short_name = "tasks") 180 public static final class NumTasks implements Name<Integer> { 181 } 182 183 /** 184 * Command line parameter: number of experiments to run. 185 */ 186 @NamedParameter(doc = "Number of seconds to sleep in each task", short_name = "delay") 187 public static final class Delay implements Name<Integer> { 188 } 189 190 /** 191 * Command line parameter = true to submit task and context in one request. 192 */ 193 @NamedParameter(doc = "Submit task and context together", 194 short_name = "piggyback", default_value = "true") 195 public static final class Piggyback implements Name<Boolean> { 196 } 197 198 /** 199 * Command line parameter = true to run locally, or false to run on YARN. 200 */ 201 @NamedParameter(doc = "Whether or not to run on the local runtime", 202 short_name = "local", default_value = "true") 203 public static final class Local implements Name<Boolean> { 204 } 205 206 /** 207 * Command line parameter = Numeric ID for the job. 208 */ 209 @NamedParameter(doc = "Numeric ID for the job", short_name = "id", default_value = "-1") 210 public static final class JobId implements Name<Integer> { 211 } 212}