001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common; 020 021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; 022import org.apache.reef.runtime.common.launch.ProfilingStopHandler; 023import org.apache.reef.runtime.common.launch.REEFErrorHandler; 024import org.apache.reef.runtime.common.launch.REEFMessageCodec; 025import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler; 026import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath; 027import org.apache.reef.tang.*; 028import org.apache.reef.tang.annotations.Name; 029import org.apache.reef.tang.annotations.NamedParameter; 030import org.apache.reef.tang.annotations.Parameter; 031import org.apache.reef.tang.exceptions.BindException; 032import org.apache.reef.tang.exceptions.InjectionException; 033import org.apache.reef.tang.formats.ConfigurationSerializer; 034import org.apache.reef.util.EnvironmentUtils; 035import org.apache.reef.util.REEFVersion; 036import org.apache.reef.util.ThreadLogger; 037import org.apache.reef.util.logging.LoggingSetup; 038import org.apache.reef.wake.profiler.WakeProfiler; 039import org.apache.reef.wake.remote.RemoteConfiguration; 040import org.apache.reef.wake.time.Clock; 041 042import javax.inject.Inject; 043import java.io.File; 044import java.io.FileNotFoundException; 045import java.io.IOException; 046import java.util.logging.Level; 047import java.util.logging.Logger; 048 049/** 050 * The main entrance point into any REEF process. It is mostly reading from the command line to instantiate 051 * the runtime clock and calling .run() on it. 052 */ 053public final class REEFLauncher { 054 055 /** 056 * Parameter which enables profiling. 057 */ 058 @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false") 059 public static final class ProfilingEnabled implements Name<Boolean> { 060 } 061 062 private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName()); 063 064 private static final Configuration LAUNCHER_STATIC_CONFIG = Tang.Factory.getTang().newConfigurationBuilder() 065 .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class) 066 .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) 067 .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER") 068 .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) 069 .build(); 070 071 static { 072 LoggingSetup.setupCommonsLogging(); 073 } 074 075 private final String configurationPath; 076 private final boolean isProfilingEnabled; 077 private final ConfigurationSerializer configurationSerializer; 078 private final REEFVersion reefVersion; 079 private final Configuration clockConfig; 080 081 @Inject 082 private REEFLauncher(@Parameter(ClockConfigurationPath.class) final String configurationPath, 083 @Parameter(ProfilingEnabled.class) final boolean enableProfiling, 084 final ConfigurationSerializer configurationSerializer, 085 final REEFVersion reefVersion) { 086 this.configurationPath = configurationPath; 087 this.configurationSerializer = configurationSerializer; 088 this.isProfilingEnabled = enableProfiling; 089 this.reefVersion = reefVersion; 090 this.clockConfig = Configurations.merge( 091 readConfigurationFromDisk(this.configurationPath, this.configurationSerializer), 092 LAUNCHER_STATIC_CONFIG); 093 } 094 095 private static REEFLauncher getREEFLauncher(final String clockConfigPath) { 096 final Injector injector; 097 try { 098 final Configuration clockArgConfig = Tang.Factory.getTang().newConfigurationBuilder() 099 .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath).build(); 100 injector = Tang.Factory.getTang().newInjector(clockArgConfig); 101 } catch (final BindException e) { 102 throw fatal("Error in parsing the command line", e); 103 } 104 105 try { 106 return injector.getInstance(REEFLauncher.class); 107 } catch (final InjectionException e) { 108 throw fatal("Unable to run REEFLauncher.", e); 109 } 110 } 111 112 private static RuntimeException fatal(final String msg, final Throwable t) { 113 LOG.log(Level.SEVERE, msg, t); 114 return new RuntimeException(msg, t); 115 } 116 117 private static RuntimeException fatal(final REEFErrorHandler errorHandler, final String msg, final Throwable t) { 118 errorHandler.onNext(t); 119 LOG.log(Level.SEVERE, msg, t); 120 return new RuntimeException(msg, t); 121 } 122 123 @SuppressWarnings("checkstyle:constructorwithoutparams") // avoids logging the same message twice in fatal() 124 private static Configuration readConfigurationFromDisk( 125 final String configPath, final ConfigurationSerializer serializer) { 126 LOG.log(Level.FINEST, "Loading configuration file: {0}", configPath); 127 128 final File evaluatorConfigFile = new File(configPath); 129 130 if (!evaluatorConfigFile.exists()) { 131 final String message = "The configuration file " + configPath + 132 "doesn't exist. This points to an issue in the job submission."; 133 throw fatal(message, new FileNotFoundException()); 134 } else if (!evaluatorConfigFile.canRead()) { 135 final String message = "The configuration file " + configPath + 136 " exists, but can't be read"; 137 throw fatal(message, new IOException()); 138 } else { 139 try { 140 return serializer.fromFile(evaluatorConfigFile); 141 } catch (final IOException e) { 142 final String message = "Unable to parse the configuration file " + configPath; 143 throw fatal(message, e); 144 } 145 } 146 } 147 148 /** 149 * Launches a REEF client process (Driver or Evaluator). 150 * 151 * @param args command-line args 152 */ 153 @SuppressWarnings("checkstyle:illegalcatch") 154 public static void main(final String[] args) { 155 LOG.log(Level.INFO, "Entering REEFLauncher.main()."); 156 LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name")); 157 158 LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.", 159 EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); 160 161 if (args.length != 1) { 162 final String message = "REEFLauncher have one and only one argument to specify the runtime clock " + 163 "configuration path"; 164 165 throw fatal(message, new IllegalArgumentException(message)); 166 } 167 168 final REEFLauncher launcher = getREEFLauncher(args[0]); 169 170 Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.clockConfig)); 171 launcher.logVersion(); 172 173 try (final Clock clock = launcher.getClockFromConfig()) { 174 LOG.log(Level.FINE, "Clock starting"); 175 clock.run(); 176 LOG.log(Level.FINE, "Clock exiting"); 177 } catch (final Throwable ex) { 178 try (final REEFErrorHandler errorHandler = launcher.getErrorHandlerFromConfig()) { 179 throw fatal(errorHandler, "Unable to instantiate the clock", ex); 180 } catch (final InjectionException e) { 181 throw fatal("Unable to instantiate the clock and the ErrorHandler", e); 182 } 183 } 184 185 LOG.log(Level.INFO, "Exiting REEFLauncher.main()"); 186 if (LOG.isLoggable(Level.FINEST)) { 187 LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after REEFLauncher.close():")); 188 } 189 System.exit(0); 190 if (LOG.isLoggable(Level.FINEST)) { 191 LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after System.exit():")); 192 } 193 } 194 195 private void logVersion() { 196 this.reefVersion.logVersion(); 197 } 198 199 /** 200 * A new REEFErrorHandler is instantiated instead of lazy instantiation and saving the instantiated 201 * handler as a field since the ErrorHandler is closeable. 202 * @return A new REEFErrorHandler from clock config 203 * @throws InjectionException 204 */ 205 private REEFErrorHandler getErrorHandlerFromConfig() throws InjectionException { 206 return Tang.Factory.getTang().newInjector(this.clockConfig).getInstance(REEFErrorHandler.class); 207 } 208 209 /** 210 * A new Clock is instantiated instead of lazy instantiation and saving the instantiated 211 * handler as a field since the Clock is closeable. 212 * @return A new Clock from clock config 213 * @throws InjectionException 214 */ 215 private Clock getClockFromConfig() throws InjectionException { 216 final Injector clockInjector = Tang.Factory.getTang().newInjector(this.clockConfig); 217 if (this.isProfilingEnabled) { 218 final WakeProfiler profiler = new WakeProfiler(); 219 ProfilingStopHandler.setProfiler(profiler); 220 clockInjector.bindAspect(profiler); 221 } 222 223 return clockInjector.getInstance(Clock.class); 224 } 225}