001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.evaluator.failure; 020 021import org.apache.reef.client.DriverConfiguration; 022import org.apache.reef.client.DriverLauncher; 023import org.apache.reef.client.LauncherStatus; 024import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 025import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 026import org.apache.reef.tang.*; 027import org.apache.reef.tang.annotations.Name; 028import org.apache.reef.tang.annotations.NamedParameter; 029import org.apache.reef.tang.exceptions.BindException; 030import org.apache.reef.tang.exceptions.InjectionException; 031import org.apache.reef.tang.formats.CommandLine; 032import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToFail; 033import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToSubmit; 034import org.apache.reef.util.EnvironmentUtils; 035 036import java.io.IOException; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Entry point class for REEF failure test. 042 */ 043public final class FailureREEF { 044 /** 045 * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. 046 */ 047 public static final int MAX_NUMBER_OF_EVALUATORS = 16; 048 049 private static final Logger LOG = Logger.getLogger(FailureREEF.class.getName()); 050 051 private static Configuration parseCommandLine(final String[] aArgs) { 052 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 053 try { 054 new CommandLine(cb) 055 .registerShortNameOfClass(Local.class) 056 .registerShortNameOfClass(TimeOut.class) 057 .processCommandLine(aArgs); 058 return cb.build(); 059 } catch (final BindException | IOException ex) { 060 final String msg = "Unable to parse command line"; 061 LOG.log(Level.SEVERE, msg, ex); 062 throw new RuntimeException(msg, ex); 063 } 064 } 065 066 /** 067 * @return (immutable) TANG Configuration object. 068 * @throws BindException if configuration injector fails. 069 * @throws InjectionException if the Local.class parameter is not injected. 070 */ 071 private static Configuration getRunTimeConfiguration(final boolean isLocal) throws BindException { 072 073 final Configuration runtimeConfiguration; 074 075 if (isLocal) { 076 LOG.log(Level.INFO, "Running Failure demo on the local runtime"); 077 runtimeConfiguration = LocalRuntimeConfiguration.CONF 078 .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) 079 .build(); 080 } else { 081 LOG.log(Level.INFO, "Running Failure demo on YARN"); 082 runtimeConfiguration = YarnClientConfiguration.CONF.build(); 083 } 084 085 return runtimeConfiguration; 086 } 087 088 public static LauncherStatus runFailureReef( 089 final Configuration runtimeConfig, final int timeout, final int numEvaluatorsToSubmit, 090 final int numEvaluatorsToFail) throws InjectionException { 091 092 final Configuration driverConf = DriverConfiguration.CONF 093 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailureDriver.class)) 094 .set(DriverConfiguration.DRIVER_IDENTIFIER, "FailureREEF") 095 .set(DriverConfiguration.ON_DRIVER_STARTED, FailureDriver.StartHandler.class) 096 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, FailureDriver.EvaluatorAllocatedHandler.class) 097 .set(DriverConfiguration.ON_EVALUATOR_FAILED, FailureDriver.EvaluatorFailedHandler.class) 098 .set(DriverConfiguration.ON_DRIVER_STOP, FailureDriver.StopHandler.class) 099 .build(); 100 101 final Configuration namedParamsConf = Tang.Factory.getTang().newConfigurationBuilder() 102 .bindNamedParameter(NumEvaluatorsToSubmit.class, Integer.toString(numEvaluatorsToSubmit)) 103 .bindNamedParameter(NumEvaluatorsToFail.class, Integer.toString(numEvaluatorsToFail)) 104 .build(); 105 106 final LauncherStatus state = DriverLauncher.getLauncher(runtimeConfig) 107 .run(Configurations.merge(driverConf, namedParamsConf), timeout); 108 109 LOG.log(Level.INFO, "REEF job completed: {0}", state); 110 return state; 111 } 112 113 public static void main(final String[] args) throws InjectionException { 114 final Configuration commandLineConf = parseCommandLine(args); 115 final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); 116 final boolean isLocal = injector.getNamedInstance(Local.class); 117 final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000; 118 runFailureReef(getRunTimeConfiguration(isLocal), jobTimeout, 40, 10); 119 } 120 121 /** 122 * Empty private constructor to prohibit instantiation of utility class. 123 */ 124 private FailureREEF() { 125 } 126 127 /** 128 * Command line parameter = true to run locally, or false to run on YARN. 129 */ 130 @NamedParameter(doc = "Whether or not to run on the local runtime", 131 short_name = "local", default_value = "true") 132 public static final class Local implements Name<Boolean> { 133 } 134 135 /** 136 * Number of minutes before timeout. 137 */ 138 @NamedParameter(doc = "Number of minutes before timeout", 139 short_name = "timeout", default_value = "2") 140 public static final class TimeOut implements Name<Integer> { 141 } 142}