001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common; 020 021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; 022import org.apache.reef.runtime.common.launch.REEFErrorHandler; 023import org.apache.reef.runtime.common.launch.REEFMessageCodec; 024import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler; 025import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath; 026import org.apache.reef.tang.*; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.tang.exceptions.BindException; 029import org.apache.reef.tang.exceptions.InjectionException; 030import org.apache.reef.tang.formats.ConfigurationSerializer; 031import org.apache.reef.util.EnvironmentUtils; 032import org.apache.reef.util.ThreadLogger; 033import org.apache.reef.util.logging.LoggingSetup; 034import org.apache.reef.wake.remote.RemoteConfiguration; 035import org.apache.reef.wake.time.Clock; 036 037import javax.inject.Inject; 038import java.io.File; 039import java.io.FileNotFoundException; 040import java.io.IOException; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * The main entry point into any REEF process (Driver and Evaluator). 046 * It is mostly reading from the command line to instantiate 047 * the runtime clock and calling .run() on it. 048 */ 049public final class REEFLauncher { 050 051 private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName()); 052 053 private static final Tang TANG = Tang.Factory.getTang(); 054 055 private static final Configuration LAUNCHER_STATIC_CONFIG = 056 TANG.newConfigurationBuilder() 057 .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER") 058 .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) 059 .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) 060 .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class) 061 .build(); 062 063 static { 064 LoggingSetup.setupCommonsLogging(); 065 } 066 067 /** 068 * Main configuration object of the REEF component we are launching here. 069 * REEFEnvironment uses that configuration to instantiate the Clock object, 070 * and then call .run() on it. 071 */ 072 private final Configuration envConfig; 073 074 /** 075 * REEFLauncher is instantiated in the main() method below using 076 * Tang configuration file provided as a command line argument. 077 * @param configurationPath Path to the serialized Tang configuration file. 078 * (The file must be in the local file system). 079 * @param configurationSerializer Serializer used to read the configuration file. 080 * We currently use Avro to serialize Tang configs. 081 */ 082 @Inject 083 private REEFLauncher( 084 @Parameter(ClockConfigurationPath.class) final String configurationPath, 085 final ConfigurationSerializer configurationSerializer) { 086 087 this.envConfig = Configurations.merge(LAUNCHER_STATIC_CONFIG, 088 readConfigurationFromDisk(configurationPath, configurationSerializer)); 089 } 090 091 /** 092 * Instantiate REEF Launcher. This method is called from REEFLauncher.main(). 093 * @param clockConfigPath Path to the local file that contains serialized configuration 094 * of a REEF component to launch (can be either Driver or Evaluator). 095 * @return An instance of the configured REEFLauncher object. 096 */ 097 private static REEFLauncher getREEFLauncher(final String clockConfigPath) { 098 099 try { 100 101 final Configuration clockArgConfig = TANG.newConfigurationBuilder() 102 .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath) 103 .build(); 104 105 return TANG.newInjector(clockArgConfig).getInstance(REEFLauncher.class); 106 107 } catch (final BindException ex) { 108 throw fatal("Error in parsing the command line", ex); 109 } catch (final InjectionException ex) { 110 throw fatal("Unable to instantiate REEFLauncher.", ex); 111 } 112 } 113 114 /** 115 * Read configuration from a given file and deserialize it 116 * into Tang configuration object that can be used for injection. 117 * Configuration is currently serialized using Avro. 118 * This method also prints full deserialized configuration into log. 119 * @param configPath Path to the local file that contains serialized configuration 120 * of a REEF component to launch (can be either Driver or Evaluator). 121 * @param serializer An object to deserialize the configuration file. 122 * @return Tang configuration read and deserialized from a given file. 123 */ 124 private static Configuration readConfigurationFromDisk( 125 final String configPath, final ConfigurationSerializer serializer) { 126 127 LOG.log(Level.FINER, "Loading configuration file: {0}", configPath); 128 129 final File evaluatorConfigFile = new File(configPath); 130 131 if (!evaluatorConfigFile.exists()) { 132 throw fatal( 133 "Configuration file " + configPath + " does not exist. Can be an issue in job submission.", 134 new FileNotFoundException(configPath)); 135 } 136 137 if (!evaluatorConfigFile.canRead()) { 138 throw fatal( 139 "Configuration file " + configPath + " exists, but can't be read.", 140 new IOException(configPath)); 141 } 142 143 try { 144 145 final Configuration config = serializer.fromFile(evaluatorConfigFile); 146 LOG.log(Level.FINEST, "Configuration file loaded: {0}", configPath); 147 148 return config; 149 150 } catch (final IOException e) { 151 throw fatal("Unable to parse the configuration file: " + configPath, e); 152 } 153 } 154 155 /** 156 * Launches a REEF client process (Driver or Evaluator). 157 * @param args Command-line arguments. 158 * Must be a single element containing local path to the configuration file. 159 */ 160 @SuppressWarnings("checkstyle:illegalcatch") 161 public static void main(final String[] args) { 162 163 LOG.log(Level.INFO, "Entering REEFLauncher.main()."); 164 165 LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name")); 166 LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.", 167 EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); 168 169 if (args.length != 1) { 170 final String message = "REEFLauncher have one and only one argument to specify the runtime clock " + 171 "configuration path"; 172 173 throw fatal(message, new IllegalArgumentException(message)); 174 } 175 176 final REEFLauncher launcher = getREEFLauncher(args[0]); 177 178 Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig)); 179 180 try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(launcher.envConfig)) { 181 reef.run(); 182 } catch (final Throwable ex) { 183 throw fatal("Unable to configure and start REEFEnvironment.", ex); 184 } 185 186 ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after REEFEnvironment.close():"); 187 188 LOG.log(Level.INFO, "Exiting REEFLauncher.main()"); 189 190 System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main() 191 } 192 193 /** 194 * Wrap an exception into RuntimeException with a given message, 195 * and write the same message and exception to the log. 196 * @param msg an error message to log and pass into the RuntimeException. 197 * @param t A Throwable exception to log and wrap. 198 * @return a new Runtime exception wrapping a Throwable. 199 */ 200 private static RuntimeException fatal(final String msg, final Throwable t) { 201 LOG.log(Level.SEVERE, msg, t); 202 return new RuntimeException(msg, t); 203 } 204}