001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common; 020 021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; 022import org.apache.reef.runtime.common.launch.ProfilingStopHandler; 023import org.apache.reef.runtime.common.launch.REEFErrorHandler; 024import org.apache.reef.runtime.common.launch.REEFMessageCodec; 025import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler; 026import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath; 027import org.apache.reef.tang.*; 028import org.apache.reef.tang.annotations.Name; 029import org.apache.reef.tang.annotations.NamedParameter; 030import org.apache.reef.tang.annotations.Parameter; 031import org.apache.reef.tang.exceptions.BindException; 032import org.apache.reef.tang.exceptions.InjectionException; 033import org.apache.reef.tang.formats.ConfigurationSerializer; 034import org.apache.reef.util.EnvironmentUtils; 035import org.apache.reef.util.REEFVersion; 036import org.apache.reef.util.ThreadLogger; 037import org.apache.reef.util.logging.LoggingSetup; 038import org.apache.reef.wake.profiler.WakeProfiler; 039import org.apache.reef.wake.remote.RemoteConfiguration; 040import org.apache.reef.wake.time.Clock; 041 042import javax.inject.Inject; 043import java.io.File; 044import java.io.FileNotFoundException; 045import java.io.IOException; 046import java.util.logging.Level; 047import java.util.logging.Logger; 048 049/** 050 * The main entrance point into any REEF process. It is mostly reading from the command line to instantiate 051 * the runtime clock and calling .run() on it. 052 */ 053public final class REEFLauncher { 054 055 /** 056 * Parameter which enables profiling. 057 */ 058 @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false") 059 public static final class ProfilingEnabled implements Name<Boolean> { 060 } 061 062 private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName()); 063 064 private static final Configuration LAUNCHER_STATIC_CONFIG = Tang.Factory.getTang().newConfigurationBuilder() 065 .bindSetEntry(Clock.StartHandler.class, PIDStoreStartHandler.class) 066 .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) 067 .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER") 068 .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) 069 .build(); 070 071 static { 072 LoggingSetup.setupCommonsLogging(); 073 } 074 075 private final String configurationPath; 076 private final boolean isProfilingEnabled; 077 private final ConfigurationSerializer configurationSerializer; 078 private final REEFVersion reefVersion; 079 private final Configuration clockConfig; 080 081 @Inject 082 private REEFLauncher(@Parameter(ClockConfigurationPath.class) final String configurationPath, 083 @Parameter(ProfilingEnabled.class) final boolean enableProfiling, 084 final ConfigurationSerializer configurationSerializer, 085 final REEFVersion reefVersion) { 086 this.configurationPath = configurationPath; 087 this.configurationSerializer = configurationSerializer; 088 this.isProfilingEnabled = enableProfiling; 089 this.reefVersion = reefVersion; 090 this.clockConfig = Configurations.merge( 091 readConfigurationFromDisk(this.configurationPath, this.configurationSerializer), 092 LAUNCHER_STATIC_CONFIG); 093 } 094 095 private static REEFLauncher getREEFLauncher(final String clockConfigPath) { 096 final Injector injector; 097 try { 098 final Configuration clockArgConfig = Tang.Factory.getTang().newConfigurationBuilder() 099 .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath).build(); 100 injector = Tang.Factory.getTang().newInjector(clockArgConfig); 101 } catch (final BindException e) { 102 throw fatal("Error in parsing the command line", e); 103 } 104 105 try { 106 return injector.getInstance(REEFLauncher.class); 107 } catch (final InjectionException e) { 108 throw fatal("Unable to run REEFLauncher.", e); 109 } 110 } 111 112 private static RuntimeException fatal(final String msg, final Throwable t) { 113 LOG.log(Level.SEVERE, msg, t); 114 return new RuntimeException(msg, t); 115 } 116 117 private static RuntimeException fatal(final REEFErrorHandler errorHandler, final String msg, final Throwable t) { 118 errorHandler.onNext(t); 119 LOG.log(Level.SEVERE, msg, t); 120 return new RuntimeException(msg, t); 121 } 122 123 private static Configuration readConfigurationFromDisk( 124 final String configPath, final ConfigurationSerializer serializer) { 125 LOG.log(Level.FINEST, "Loading configuration file: {0}", configPath); 126 127 final File evaluatorConfigFile = new File(configPath); 128 129 if (!evaluatorConfigFile.exists()) { 130 final String message = "The configuration file " + configPath + 131 "doesn't exist. This points to an issue in the job submission."; 132 throw fatal(message, new FileNotFoundException()); 133 } else if (!evaluatorConfigFile.canRead()) { 134 final String message = "The configuration file " + configPath + 135 " exists, but can't be read"; 136 throw fatal(message, new IOException()); 137 } else { 138 try { 139 return serializer.fromFile(evaluatorConfigFile); 140 } catch (final IOException e) { 141 final String message = "Unable to parse the configuration file " + configPath; 142 throw fatal(message, e); 143 } 144 } 145 } 146 147 /** 148 * Launches a REEF client process (Driver or Evaluator). 149 * 150 * @param args command-line args 151 */ 152 @SuppressWarnings("checkstyle:illegalcatch") 153 public static void main(final String[] args) { 154 LOG.log(Level.INFO, "Entering REEFLauncher.main()."); 155 LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name")); 156 157 LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.", 158 EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); 159 160 if (args.length != 1) { 161 final String message = "REEFLauncher have one and only one argument to specify the runtime clock " + 162 "configuration path"; 163 164 throw fatal(message, new IllegalArgumentException(message)); 165 } 166 167 final REEFLauncher launcher = getREEFLauncher(args[0]); 168 169 Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.clockConfig)); 170 launcher.logVersion(); 171 172 try (final Clock clock = launcher.getClockFromConfig()) { 173 LOG.log(Level.FINE, "Clock starting"); 174 clock.run(); 175 LOG.log(Level.FINE, "Clock exiting"); 176 } catch (final Throwable ex) { 177 try (final REEFErrorHandler errorHandler = launcher.getErrorHandlerFromConfig()) { 178 throw fatal(errorHandler, "Unable to instantiate the clock", ex); 179 } catch (final InjectionException e) { 180 throw fatal("Unable to instantiate the clock and the ErrorHandler", e); 181 } 182 } 183 184 LOG.log(Level.INFO, "Exiting REEFLauncher.main()"); 185 if (LOG.isLoggable(Level.FINEST)) { 186 LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after REEFLauncher.close():")); 187 } 188 System.exit(0); 189 if (LOG.isLoggable(Level.FINEST)) { 190 LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after System.exit():")); 191 } 192 } 193 194 private void logVersion() { 195 this.reefVersion.logVersion(); 196 } 197 198 /** 199 * A new REEFErrorHandler is instantiated instead of lazy instantiation and saving the instantiated 200 * handler as a field since the ErrorHandler is closeable. 201 * @return A new REEFErrorHandler from clock config 202 * @throws InjectionException 203 */ 204 private REEFErrorHandler getErrorHandlerFromConfig() throws InjectionException { 205 return Tang.Factory.getTang().newInjector(this.clockConfig).getInstance(REEFErrorHandler.class); 206 } 207 208 /** 209 * A new Clock is instantiated instead of lazy instantiation and saving the instantiated 210 * handler as a field since the Clock is closeable. 211 * @return A new Clock from clock config 212 * @throws InjectionException 213 */ 214 private Clock getClockFromConfig() throws InjectionException { 215 final Injector clockInjector = Tang.Factory.getTang().newInjector(this.clockConfig); 216 if (this.isProfilingEnabled) { 217 final WakeProfiler profiler = new WakeProfiler(); 218 ProfilingStopHandler.setProfiler(profiler); 219 clockInjector.bindAspect(profiler); 220 } 221 222 return clockInjector.getInstance(Clock.class); 223 } 224}