001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.suspend; 020 021import org.apache.reef.client.ClientConfiguration; 022import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 023import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 024import org.apache.reef.tang.*; 025import org.apache.reef.tang.annotations.Name; 026import org.apache.reef.tang.annotations.NamedParameter; 027import org.apache.reef.tang.exceptions.BindException; 028import org.apache.reef.tang.exceptions.InjectionException; 029import org.apache.reef.tang.formats.CommandLine; 030 031import java.io.IOException; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Suspend/Resume example - main class. 037 */ 038public final class Launch { 039 040 /** 041 * Standard Java logger. 042 */ 043 private static final Logger LOG = Logger.getLogger(Launch.class.getName()); 044 /** 045 * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. 046 */ 047 private static final int MAX_NUMBER_OF_EVALUATORS = 4; 048 049 /** 050 * This class should not be instantiated. 051 */ 052 private Launch() { 053 throw new RuntimeException("Do not instantiate this class!"); 054 } 055 056 /** 057 * @param args command line arguments, as passed to main() 058 * @return Configuration object. 059 */ 060 private static Configuration parseCommandLine(final String[] args) 061 throws IOException, BindException { 062 final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 063 final CommandLine cl = new CommandLine(confBuilder); 064 cl.registerShortNameOfClass(Local.class); 065 cl.registerShortNameOfClass(NumCycles.class); 066 cl.registerShortNameOfClass(Delay.class); 067 cl.registerShortNameOfClass(SuspendClientControl.Port.class); 068 cl.processCommandLine(args); 069 return confBuilder.build(); 070 } 071 072 private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf) 073 throws InjectionException, BindException { 074 final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); 075 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 076 cb.bindNamedParameter(NumCycles.class, String.valueOf(injector.getNamedInstance(NumCycles.class))); 077 cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class))); 078 cb.bindNamedParameter(SuspendClientControl.Port.class, 079 String.valueOf(injector.getNamedInstance(SuspendClientControl.Port.class))); 080 return cb.build(); 081 } 082 083 /** 084 * Parse command line arguments and create TANG configuration ready to be submitted to REEF. 085 * 086 * @param args Command line arguments, as passed into main(). 087 * @return (immutable) TANG Configuration object. 088 * @throws BindException if configuration commandLineInjector fails. 089 * @throws InjectionException if configuration commandLineInjector fails. 090 * @throws IOException error reading the configuration. 091 */ 092 private static Configuration getClientConfiguration(final String[] args) 093 throws BindException, InjectionException, IOException { 094 final Configuration commandLineConf = parseCommandLine(args); 095 096 final Configuration clientConfiguration = ClientConfiguration.CONF 097 .set(ClientConfiguration.ON_JOB_RUNNING, SuspendClient.RunningJobHandler.class) 098 .set(ClientConfiguration.ON_JOB_FAILED, SuspendClient.FailedJobHandler.class) 099 .set(ClientConfiguration.ON_JOB_COMPLETED, SuspendClient.CompletedJobHandler.class) 100 .set(ClientConfiguration.ON_RUNTIME_ERROR, SuspendClient.RuntimeErrorHandler.class) 101 .build(); 102 103 final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf); 104 final boolean isLocal = commandLineInjector.getNamedInstance(Local.class); 105 final Configuration runtimeConfiguration; 106 if (isLocal) { 107 LOG.log(Level.INFO, "Running on the local runtime"); 108 runtimeConfiguration = LocalRuntimeConfiguration.CONF 109 .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) 110 .build(); 111 } else { 112 LOG.log(Level.INFO, "Running on YARN"); 113 runtimeConfiguration = YarnClientConfiguration.CONF.build(); 114 } 115 116 return Configurations.merge( 117 runtimeConfiguration, clientConfiguration, cloneCommandLineConfiguration(commandLineConf)); 118 } 119 120 /** 121 * Main method that runs the example. 122 * 123 * @param args command line parameters. 124 */ 125 public static void main(final String[] args) { 126 try { 127 final Configuration config = getClientConfiguration(args); 128 129 LOG.log(Level.INFO, "Configuration:\n--\n{0}--", Configurations.toString(config, true)); 130 131 final Injector injector = Tang.Factory.getTang().newInjector(config); 132 final SuspendClient client = injector.getInstance(SuspendClient.class); 133 134 client.submit(); 135 client.waitForCompletion(); 136 LOG.info("Done!"); 137 138 } catch (final BindException | IOException | InjectionException ex) { 139 LOG.log(Level.SEVERE, "Cannot launch: configuration error", ex); 140 } catch (final Exception ex) { 141 LOG.log(Level.SEVERE, "Cleanup error", ex); 142 } 143 } 144 145 /** 146 * Command line parameter = true to run locally, or false to run on YARN. 147 */ 148 @NamedParameter(doc = "Whether or not to run on the local runtime", 149 short_name = "local", default_value = "true") 150 public static final class Local implements Name<Boolean> { 151 } 152 153 /** 154 * Command line parameter: number of iterations to run. 155 */ 156 @NamedParameter(doc = "Number of iterations to run", short_name = "cycles", default_value = "20") 157 public static final class NumCycles implements Name<Integer> { 158 } 159 160 /** 161 * Command line parameter: delay in seconds for each cycle. 162 */ 163 @NamedParameter(doc = "Delay in seconds between the cycles", short_name = "delay", default_value = "1") 164 public static final class Delay implements Name<Integer> { 165 } 166}