001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.suspend; 020 021import org.apache.reef.client.ClientConfiguration; 022import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 023import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 024import org.apache.reef.tang.Configuration; 025import org.apache.reef.tang.Injector; 026import org.apache.reef.tang.JavaConfigurationBuilder; 027import org.apache.reef.tang.Tang; 028import org.apache.reef.tang.annotations.Name; 029import org.apache.reef.tang.annotations.NamedParameter; 030import org.apache.reef.tang.exceptions.BindException; 031import org.apache.reef.tang.exceptions.InjectionException; 032import org.apache.reef.tang.formats.AvroConfigurationSerializer; 033import org.apache.reef.tang.formats.CommandLine; 034 035import java.io.IOException; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039/** 040 * Suspend/Resume example - main class. 041 */ 042public final class Launch { 043 044 /** 045 * Standard Java logger. 046 */ 047 private static final Logger LOG = Logger.getLogger(Launch.class.getName()); 048 /** 049 * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. 050 */ 051 private static final int MAX_NUMBER_OF_EVALUATORS = 4; 052 053 /** 054 * This class should not be instantiated. 055 */ 056 private Launch() { 057 throw new RuntimeException("Do not instantiate this class!"); 058 } 059 060 /** 061 * @param args command line arguments, as passed to main() 062 * @return Configuration object. 063 */ 064 private static Configuration parseCommandLine(final String[] args) 065 throws IOException, BindException { 066 final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 067 final CommandLine cl = new CommandLine(confBuilder); 068 cl.registerShortNameOfClass(Local.class); 069 cl.registerShortNameOfClass(NumCycles.class); 070 cl.registerShortNameOfClass(Delay.class); 071 cl.registerShortNameOfClass(SuspendClientControl.Port.class); 072 cl.processCommandLine(args); 073 return confBuilder.build(); 074 } 075 076 private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf) 077 throws InjectionException, BindException { 078 final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); 079 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 080 cb.bindNamedParameter(NumCycles.class, String.valueOf(injector.getNamedInstance(NumCycles.class))); 081 cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class))); 082 cb.bindNamedParameter(SuspendClientControl.Port.class, 083 String.valueOf(injector.getNamedInstance(SuspendClientControl.Port.class))); 084 return cb.build(); 085 } 086 087 /** 088 * Parse command line arguments and create TANG configuration ready to be submitted to REEF. 089 * 090 * @param args Command line arguments, as passed into main(). 091 * @return (immutable) TANG Configuration object. 092 * @throws BindException if configuration commandLineInjector fails. 093 * @throws InjectionException if configuration commandLineInjector fails. 094 * @throws IOException error reading the configuration. 095 */ 096 private static Configuration getClientConfiguration(final String[] args) 097 throws BindException, InjectionException, IOException { 098 final Configuration commandLineConf = parseCommandLine(args); 099 100 final Configuration clientConfiguration = ClientConfiguration.CONF 101 .set(ClientConfiguration.ON_JOB_RUNNING, SuspendClient.RunningJobHandler.class) 102 .set(ClientConfiguration.ON_JOB_FAILED, SuspendClient.FailedJobHandler.class) 103 .set(ClientConfiguration.ON_JOB_COMPLETED, SuspendClient.CompletedJobHandler.class) 104 .set(ClientConfiguration.ON_RUNTIME_ERROR, SuspendClient.RuntimeErrorHandler.class) 105 .build(); 106 107 final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf); 108 final boolean isLocal = commandLineInjector.getNamedInstance(Local.class); 109 final Configuration runtimeConfiguration; 110 if (isLocal) { 111 LOG.log(Level.INFO, "Running on the local runtime"); 112 runtimeConfiguration = LocalRuntimeConfiguration.CONF 113 .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) 114 .build(); 115 } else { 116 LOG.log(Level.INFO, "Running on YARN"); 117 runtimeConfiguration = YarnClientConfiguration.CONF.build(); 118 } 119 120 return Tang.Factory.getTang() 121 .newConfigurationBuilder(runtimeConfiguration, clientConfiguration, 122 cloneCommandLineConfiguration(commandLineConf)) 123 .build(); 124 } 125 126 /** 127 * Main method that runs the example. 128 * 129 * @param args command line parameters. 130 */ 131 public static void main(final String[] args) { 132 try { 133 final Configuration config = getClientConfiguration(args); 134 135 LOG.log(Level.INFO, "Configuration:\n--\n{0}--", 136 new AvroConfigurationSerializer().toString(config)); 137 138 final Injector injector = Tang.Factory.getTang().newInjector(config); 139 final SuspendClient client = injector.getInstance(SuspendClient.class); 140 141 client.submit(); 142 client.waitForCompletion(); 143 LOG.info("Done!"); 144 145 } catch (final BindException | IOException | InjectionException ex) { 146 LOG.log(Level.SEVERE, "Cannot launch: configuration error", ex); 147 } catch (final Exception ex) { 148 LOG.log(Level.SEVERE, "Cleanup error", ex); 149 } 150 } 151 152 /** 153 * Command line parameter = true to run locally, or false to run on YARN. 154 */ 155 @NamedParameter(doc = "Whether or not to run on the local runtime", 156 short_name = "local", default_value = "true") 157 public static final class Local implements Name<Boolean> { 158 } 159 160 /** 161 * Command line parameter: number of iterations to run. 162 */ 163 @NamedParameter(doc = "Number of iterations to run", short_name = "cycles", default_value = "20") 164 public static final class NumCycles implements Name<Integer> { 165 } 166 167 /** 168 * Command line parameter: delay in seconds for each cycle. 169 */ 170 @NamedParameter(doc = "Delay in seconds between the cycles", short_name = "delay", default_value = "1") 171 public static final class Delay implements Name<Integer> { 172 } 173}