001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.pool; 020 021import org.apache.reef.client.DriverConfiguration; 022import org.apache.reef.client.DriverLauncher; 023import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 024import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 025import org.apache.reef.tang.Configuration; 026import org.apache.reef.tang.Injector; 027import org.apache.reef.tang.JavaConfigurationBuilder; 028import org.apache.reef.tang.Tang; 029import org.apache.reef.tang.annotations.Name; 030import org.apache.reef.tang.annotations.NamedParameter; 031import org.apache.reef.tang.exceptions.BindException; 032import org.apache.reef.tang.exceptions.InjectionException; 033import org.apache.reef.tang.formats.AvroConfigurationSerializer; 034import org.apache.reef.tang.formats.CommandLine; 035import org.apache.reef.util.EnvironmentUtils; 036 037import java.io.IOException; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Pool of Evaluators example - main class. 043 */ 044public final class Launch { 045 046 /** 047 * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. 048 */ 049 private static final int MAX_NUMBER_OF_EVALUATORS = 4; 050 /** 051 * Standard Java logger. 052 */ 053 private static final Logger LOG = Logger.getLogger(Launch.class.getName()); 054 055 /** 056 * This class should not be instantiated. 057 */ 058 private Launch() { 059 throw new RuntimeException("Do not instantiate this class!"); 060 } 061 062 /** 063 * Parse the command line arguments. 064 * 065 * @param args command line arguments, as passed to main() 066 * @return Configuration object. 067 * @throws BindException configuration error. 068 * @throws IOException error reading the configuration. 069 */ 070 private static Configuration parseCommandLine(final String[] args) 071 throws BindException, IOException { 072 final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 073 final CommandLine cl = new CommandLine(confBuilder); 074 cl.registerShortNameOfClass(Local.class); 075 cl.registerShortNameOfClass(Piggyback.class); 076 cl.registerShortNameOfClass(NumEvaluators.class); 077 cl.registerShortNameOfClass(NumTasks.class); 078 cl.registerShortNameOfClass(Delay.class); 079 cl.registerShortNameOfClass(JobId.class); 080 cl.processCommandLine(args); 081 return confBuilder.build(); 082 } 083 084 private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf) 085 throws InjectionException, BindException { 086 final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); 087 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 088 cb.bindNamedParameter(Piggyback.class, String.valueOf(injector.getNamedInstance(Piggyback.class))); 089 cb.bindNamedParameter(NumEvaluators.class, String.valueOf(injector.getNamedInstance(NumEvaluators.class))); 090 cb.bindNamedParameter(NumTasks.class, String.valueOf(injector.getNamedInstance(NumTasks.class))); 091 cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class))); 092 return cb.build(); 093 } 094 095 /** 096 * Parse command line arguments and create TANG configuration ready to be submitted to REEF. 097 * 098 * @param commandLineConf Parsed command line arguments, as passed into main(). 099 * @return (immutable) TANG Configuration object. 100 * @throws BindException if configuration commandLineInjector fails. 101 * @throws InjectionException if configuration commandLineInjector fails. 102 */ 103 private static Configuration getClientConfiguration( 104 final Configuration commandLineConf, final boolean isLocal) 105 throws BindException, InjectionException { 106 final Configuration runtimeConfiguration; 107 if (isLocal) { 108 LOG.log(Level.FINE, "Running on the local runtime"); 109 runtimeConfiguration = LocalRuntimeConfiguration.CONF 110 .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) 111 .build(); 112 } else { 113 LOG.log(Level.FINE, "Running on YARN"); 114 runtimeConfiguration = YarnClientConfiguration.CONF.build(); 115 } 116 return Tang.Factory.getTang().newConfigurationBuilder( 117 runtimeConfiguration, cloneCommandLineConfiguration(commandLineConf)) 118 .build(); 119 } 120 121 /** 122 * Main method that launches the REEF job. 123 * 124 * @param args command line parameters. 125 */ 126 public static void main(final String[] args) { 127 128 try { 129 130 final Configuration commandLineConf = parseCommandLine(args); 131 final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); 132 133 final boolean isLocal = injector.getNamedInstance(Local.class); 134 final int numEvaluators = injector.getNamedInstance(NumEvaluators.class); 135 final int numTasks = injector.getNamedInstance(NumTasks.class); 136 final int delay = injector.getNamedInstance(Delay.class); 137 final int jobNum = injector.getNamedInstance(JobId.class); 138 139 final String jobId = String.format("pool.e_%d.a_%d.d_%d.%d", 140 numEvaluators, numTasks, delay, jobNum < 0 ? System.currentTimeMillis() : jobNum); 141 142 // Timeout: delay + 6 extra seconds per Task per Evaluator + 2 minutes to allocate each Evaluator: 143 final int timeout = numTasks * (delay + 6) * 1000 / numEvaluators + numEvaluators * 120000; 144 145 final Configuration runtimeConfig = getClientConfiguration(commandLineConf, isLocal); 146 LOG.log(Level.INFO, "TIME: Start Client {0} with timeout {1} sec. Configuration:\n--\n{2}--", 147 new Object[]{jobId, timeout / 1000, new AvroConfigurationSerializer().toString(runtimeConfig)}); 148 149 final Configuration driverConfig = DriverConfiguration.CONF 150 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class)) 151 .set(DriverConfiguration.DRIVER_IDENTIFIER, jobId) 152 .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class) 153 .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class) 154 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class) 155 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class) 156 .set(DriverConfiguration.ON_TASK_RUNNING, JobDriver.RunningTaskHandler.class) 157 .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class) 158 .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class) 159 .build(); 160 161 final Configuration submittedConfiguration = Tang.Factory.getTang() 162 .newConfigurationBuilder(driverConfig, commandLineConf).build(); 163 DriverLauncher.getLauncher(runtimeConfig) 164 .run(submittedConfiguration, timeout); 165 166 LOG.log(Level.INFO, "TIME: Stop Client {0}", jobId); 167 168 } catch (final BindException | InjectionException | IOException ex) { 169 LOG.log(Level.SEVERE, "Job configuration error", ex); 170 } 171 } 172 173 /** 174 * Command line parameter: number of Evaluators to request. 175 */ 176 @NamedParameter(doc = "Number of evaluators to request", short_name = "evaluators") 177 public static final class NumEvaluators implements Name<Integer> { 178 } 179 180 /** 181 * Command line parameter: number of Tasks to run. 182 */ 183 @NamedParameter(doc = "Number of tasks to run", short_name = "tasks") 184 public static final class NumTasks implements Name<Integer> { 185 } 186 187 /** 188 * Command line parameter: number of experiments to run. 189 */ 190 @NamedParameter(doc = "Number of seconds to sleep in each task", short_name = "delay") 191 public static final class Delay implements Name<Integer> { 192 } 193 194 /** 195 * Command line parameter = true to submit task and context in one request. 196 */ 197 @NamedParameter(doc = "Submit task and context together", 198 short_name = "piggyback", default_value = "true") 199 public static final class Piggyback implements Name<Boolean> { 200 } 201 202 /** 203 * Command line parameter = true to run locally, or false to run on YARN. 204 */ 205 @NamedParameter(doc = "Whether or not to run on the local runtime", 206 short_name = "local", default_value = "true") 207 public static final class Local implements Name<Boolean> { 208 } 209 210 /** 211 * Command line parameter = Numeric ID for the job. 212 */ 213 @NamedParameter(doc = "Numeric ID for the job", short_name = "id", default_value = "-1") 214 public static final class JobId implements Name<Integer> { 215 } 216}