001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.group.broadcast; 020 021import org.apache.reef.annotations.audience.ClientSide; 022import org.apache.reef.client.DriverConfiguration; 023import org.apache.reef.client.DriverLauncher; 024import org.apache.reef.client.LauncherStatus; 025import org.apache.reef.examples.group.bgd.parameters.ModelDimensions; 026import org.apache.reef.examples.group.broadcast.parameters.NumberOfReceivers; 027import org.apache.reef.io.network.group.impl.driver.GroupCommService; 028import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 029import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 030import org.apache.reef.tang.*; 031import org.apache.reef.tang.annotations.Name; 032import org.apache.reef.tang.annotations.NamedParameter; 033import org.apache.reef.tang.exceptions.InjectionException; 034import org.apache.reef.tang.formats.CommandLine; 035import org.apache.reef.util.EnvironmentUtils; 036 037import java.io.IOException; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Client for broadcast example. 043 */ 044@ClientSide 045public final class BroadcastREEF { 046 047 private static final Logger LOG = Logger.getLogger(BroadcastREEF.class.getName()); 048 049 private static final String MAX_NUMBER_OF_EVALUATORS = "20"; 050 051 /** 052 * Number of milliseconds to wait for the job to complete. 053 */ 054 private static final int JOB_TIMEOUT = 2 * 60 * 1000; 055 056 /** 057 * Command line parameter = true to run locally, or false to run on YARN. 058 */ 059 @NamedParameter(doc = "Whether or not to run on the local runtime", short_name = "local", default_value = "true") 060 public static final class Local implements Name<Boolean> { 061 } 062 063 /** 064 * Input path. 065 */ 066 @NamedParameter(short_name = "input") 067 public static final class InputDir implements Name<String> { 068 } 069 070 private static boolean local; 071 private static int dimensions; 072 private static int numberOfReceivers; 073 074 private static Configuration parseCommandLine(final String[] aArgs) { 075 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 076 try { 077 final CommandLine cl = new CommandLine(cb); 078 cl.registerShortNameOfClass(Local.class); 079 cl.registerShortNameOfClass(ModelDimensions.class); 080 cl.registerShortNameOfClass(NumberOfReceivers.class); 081 cl.processCommandLine(aArgs); 082 } catch (final IOException ex) { 083 final String msg = "Unable to parse command line"; 084 LOG.log(Level.SEVERE, msg, ex); 085 throw new RuntimeException(msg, ex); 086 } 087 return cb.build(); 088 } 089 090 /** 091 * copy the parameters from the command line required for the Client configuration. 092 */ 093 private static void storeCommandLineArgs( 094 final Configuration commandLineConf) throws InjectionException { 095 final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); 096 local = injector.getNamedInstance(Local.class); 097 dimensions = injector.getNamedInstance(ModelDimensions.class); 098 numberOfReceivers = injector.getNamedInstance(NumberOfReceivers.class); 099 } 100 101 /** 102 * @return (immutable) TANG Configuration object. 103 */ 104 private static Configuration getRunTimeConfiguration() { 105 final Configuration runtimeConfiguration; 106 if (local) { 107 LOG.log(Level.INFO, "Running Broadcast example using group API on the local runtime"); 108 runtimeConfiguration = LocalRuntimeConfiguration.CONF 109 .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) 110 .build(); 111 } else { 112 LOG.log(Level.INFO, "Running Broadcast example using group API on YARN"); 113 runtimeConfiguration = YarnClientConfiguration.CONF.build(); 114 } 115 return runtimeConfiguration; 116 } 117 118 public static LauncherStatus runBGDReef( 119 final Configuration runtimeConfiguration) throws InjectionException { 120 121 final Configuration driverConfiguration = DriverConfiguration.CONF 122 .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getAllClasspathJars()) 123 .set(DriverConfiguration.ON_DRIVER_STARTED, BroadcastDriver.StartHandler.class) 124 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, BroadcastDriver.EvaluatorAllocatedHandler.class) 125 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, BroadcastDriver.ContextActiveHandler.class) 126 .set(DriverConfiguration.ON_CONTEXT_CLOSED, BroadcastDriver.ContextCloseHandler.class) 127 .set(DriverConfiguration.ON_TASK_FAILED, BroadcastDriver.FailedTaskHandler.class) 128 .set(DriverConfiguration.DRIVER_IDENTIFIER, "BroadcastDriver") 129 .build(); 130 131 final Configuration groupCommServConfiguration = GroupCommService.getConfiguration(); 132 133 final Configuration mergedDriverConfiguration = Tang.Factory.getTang() 134 .newConfigurationBuilder(groupCommServConfiguration, driverConfiguration) 135 .bindNamedParameter(ModelDimensions.class, Integer.toString(dimensions)) 136 .bindNamedParameter(NumberOfReceivers.class, Integer.toString(numberOfReceivers)) 137 .build(); 138 139 if (LOG.isLoggable(Level.FINE)) { 140 LOG.log(Level.FINE, "Merged driver configuration:\n{0}", 141 Configurations.toString(mergedDriverConfiguration)); 142 } 143 144 return DriverLauncher.getLauncher(runtimeConfiguration).run(mergedDriverConfiguration, JOB_TIMEOUT); 145 } 146 147 public static void main(final String[] args) throws InjectionException { 148 final Configuration commandLineConf = parseCommandLine(args); 149 storeCommandLineArgs(commandLineConf); 150 final Configuration runtimeConfiguration = getRunTimeConfiguration(); 151 final LauncherStatus state = runBGDReef(runtimeConfiguration); 152 LOG.log(Level.INFO, "REEF job completed: {0}", state); 153 } 154 155 /** 156 * Empty private constructor to prohibit instantiation of utility class. 157 */ 158 private BroadcastREEF() { 159 } 160}