001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.group.broadcast; 020 021import org.apache.reef.annotations.audience.ClientSide; 022import org.apache.reef.client.DriverConfiguration; 023import org.apache.reef.client.DriverLauncher; 024import org.apache.reef.client.LauncherStatus; 025import org.apache.reef.examples.group.bgd.parameters.ModelDimensions; 026import org.apache.reef.examples.group.broadcast.parameters.NumberOfReceivers; 027import org.apache.reef.io.network.group.impl.driver.GroupCommService; 028import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 029import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 030import org.apache.reef.tang.Configuration; 031import org.apache.reef.tang.Injector; 032import org.apache.reef.tang.JavaConfigurationBuilder; 033import org.apache.reef.tang.Tang; 034import org.apache.reef.tang.annotations.Name; 035import org.apache.reef.tang.annotations.NamedParameter; 036import org.apache.reef.tang.exceptions.InjectionException; 037import org.apache.reef.tang.formats.AvroConfigurationSerializer; 038import org.apache.reef.tang.formats.CommandLine; 039import org.apache.reef.util.EnvironmentUtils; 040 041import java.io.IOException; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Client for broadcast example. 047 */ 048@ClientSide 049public final class BroadcastREEF { 050 private static final Logger LOG = Logger.getLogger(BroadcastREEF.class.getName()); 051 052 private static final String MAX_NUMBER_OF_EVALUATORS = "20"; 053 054 /** 055 * Number of milliseconds to wait for the job to complete. 056 */ 057 private static final int JOB_TIMEOUT = 2 * 60 * 1000; 058 059 /** 060 * Command line parameter = true to run locally, or false to run on YARN. 061 */ 062 @NamedParameter(doc = "Whether or not to run on the local runtime", short_name = "local", default_value = "true") 063 public static final class Local implements Name<Boolean> { 064 } 065 066 /** 067 * Input path. 068 */ 069 @NamedParameter(short_name = "input") 070 public static final class InputDir implements Name<String> { 071 } 072 073 private static boolean local; 074 private static int dimensions; 075 private static int numberOfReceivers; 076 077 private static Configuration parseCommandLine(final String[] aArgs) { 078 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 079 try { 080 final CommandLine cl = new CommandLine(cb); 081 cl.registerShortNameOfClass(Local.class); 082 cl.registerShortNameOfClass(ModelDimensions.class); 083 cl.registerShortNameOfClass(NumberOfReceivers.class); 084 cl.processCommandLine(aArgs); 085 } catch (final IOException ex) { 086 final String msg = "Unable to parse command line"; 087 LOG.log(Level.SEVERE, msg, ex); 088 throw new RuntimeException(msg, ex); 089 } 090 return cb.build(); 091 } 092 093 /** 094 * copy the parameters from the command line required for the Client configuration. 095 */ 096 private static void storeCommandLineArgs( 097 final Configuration commandLineConf) throws InjectionException { 098 final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); 099 local = injector.getNamedInstance(Local.class); 100 dimensions = injector.getNamedInstance(ModelDimensions.class); 101 numberOfReceivers = injector.getNamedInstance(NumberOfReceivers.class); 102 } 103 104 /** 105 * @return (immutable) TANG Configuration object. 106 */ 107 private static Configuration getRunTimeConfiguration() { 108 final Configuration runtimeConfiguration; 109 if (local) { 110 LOG.log(Level.INFO, "Running Broadcast example using group API on the local runtime"); 111 runtimeConfiguration = LocalRuntimeConfiguration.CONF 112 .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) 113 .build(); 114 } else { 115 LOG.log(Level.INFO, "Running Broadcast example using group API on YARN"); 116 runtimeConfiguration = YarnClientConfiguration.CONF.build(); 117 } 118 return runtimeConfiguration; 119 } 120 121 public static LauncherStatus runBGDReef( 122 final Configuration runtimeConfiguration) throws InjectionException { 123 124 final Configuration driverConfiguration = DriverConfiguration.CONF 125 .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getAllClasspathJars()) 126 .set(DriverConfiguration.ON_DRIVER_STARTED, BroadcastDriver.StartHandler.class) 127 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, BroadcastDriver.EvaluatorAllocatedHandler.class) 128 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, BroadcastDriver.ContextActiveHandler.class) 129 .set(DriverConfiguration.ON_CONTEXT_CLOSED, BroadcastDriver.ContextCloseHandler.class) 130 .set(DriverConfiguration.ON_TASK_FAILED, BroadcastDriver.FailedTaskHandler.class) 131 .set(DriverConfiguration.DRIVER_IDENTIFIER, "BroadcastDriver") 132 .build(); 133 134 final Configuration groupCommServConfiguration = GroupCommService.getConfiguration(); 135 136 final Configuration mergedDriverConfiguration = Tang.Factory.getTang() 137 .newConfigurationBuilder(groupCommServConfiguration, driverConfiguration) 138 .bindNamedParameter(ModelDimensions.class, Integer.toString(dimensions)) 139 .bindNamedParameter(NumberOfReceivers.class, Integer.toString(numberOfReceivers)) 140 .build(); 141 142 LOG.info(new AvroConfigurationSerializer().toString(mergedDriverConfiguration)); 143 144 return DriverLauncher.getLauncher(runtimeConfiguration).run(mergedDriverConfiguration, JOB_TIMEOUT); 145 } 146 147 public static void main(final String[] args) throws InjectionException { 148 final Configuration commandLineConf = parseCommandLine(args); 149 storeCommandLineArgs(commandLineConf); 150 final Configuration runtimeConfiguration = getRunTimeConfiguration(); 151 final LauncherStatus state = runBGDReef(runtimeConfiguration); 152 LOG.log(Level.INFO, "REEF job completed: {0}", state); 153 } 154 155 /** 156 * Empty private constructor to prohibit instantiation of utility class. 157 */ 158 private BroadcastREEF() { 159 } 160}