001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.launch; 020 021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; 022import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath; 023import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; 024import org.apache.reef.runtime.common.launch.parameters.LaunchID; 025import org.apache.reef.runtime.common.utils.RemoteManager; 026import org.apache.reef.tang.*; 027import org.apache.reef.tang.annotations.Name; 028import org.apache.reef.tang.annotations.NamedParameter; 029import org.apache.reef.tang.annotations.Parameter; 030import org.apache.reef.tang.formats.ConfigurationSerializer; 031import org.apache.reef.util.REEFVersion; 032import org.apache.reef.wake.profiler.WakeProfiler; 033import org.apache.reef.wake.remote.RemoteConfiguration; 034import org.apache.reef.wake.time.Clock; 035 036import javax.inject.Inject; 037import java.io.File; 038import java.io.FileNotFoundException; 039import java.io.IOException; 040import java.util.logging.Level; 041import java.util.logging.Logger; 042 043/** 044 * This encapsulates processes started by REEF. 045 */ 046public final class LaunchClass implements AutoCloseable, Runnable { 047 048 private static final Logger LOG = Logger.getLogger(LaunchClass.class.getName()); 049 private final RemoteManager remoteManager; 050 private final String launchID; 051 private final String errorHandlerID; 052 private final String evaluatorConfigurationPath; 053 private final boolean isProfilingEnabled; 054 private final REEFErrorHandler errorHandler; 055 private final ConfigurationSerializer configurationSerializer; 056 private WakeProfiler profiler; 057 058 @Inject 059 LaunchClass(final RemoteManager remoteManager, 060 final REEFUncaughtExceptionHandler uncaughtExceptionHandler, 061 final REEFErrorHandler errorHandler, 062 final @Parameter(LaunchID.class) String launchID, 063 final @Parameter(ErrorHandlerRID.class) String errorHandlerID, 064 final @Parameter(ClockConfigurationPath.class) String evaluatorConfigurationPath, 065 final @Parameter(ProfilingEnabled.class) boolean enableProfiling, 066 final ConfigurationSerializer configurationSerializer, 067 final REEFVersion reefVersion) { 068 reefVersion.logVersion(); 069 this.remoteManager = remoteManager; 070 this.launchID = launchID; 071 this.errorHandlerID = errorHandlerID; 072 this.evaluatorConfigurationPath = evaluatorConfigurationPath; 073 this.isProfilingEnabled = enableProfiling; 074 this.errorHandler = errorHandler; 075 this.configurationSerializer = configurationSerializer; 076 077 078 // Registering a default exception handler. It sends every exception to the upstream RemoteManager 079 Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); 080 081 082 if (isProfilingEnabled) { 083 this.profiler = new WakeProfiler(); 084 ProfilingStopHandler.setProfiler(profiler); // TODO: This probably should be bound via Tang. 085 } 086 } 087 088 /** 089 * Loads the client and resource manager configuration files from disk. 090 */ 091 private Configuration getClockConfiguration() { 092 return Configurations.merge(readConfigurationFromDisk(), getStaticClockConfiguration()); 093 } 094 095 096 private Configuration readConfigurationFromDisk() { 097 LOG.log(Level.FINEST, "Loading configuration file: {0}", this.evaluatorConfigurationPath); 098 099 final File evaluatorConfigFile = new File(this.evaluatorConfigurationPath); 100 101 if (!evaluatorConfigFile.exists()) { 102 final String message = "The configuration file " + this.evaluatorConfigurationPath + 103 "doesn't exist. This points to an issue in the job submission."; 104 fail(message, new FileNotFoundException()); 105 throw new RuntimeException(message); 106 } else if (!evaluatorConfigFile.canRead()) { 107 final String message = "The configuration file " + this.evaluatorConfigurationPath + " exists, but can't be read"; 108 fail(message, new IOException()); 109 throw new RuntimeException(message); 110 } else { 111 try { 112 return this.configurationSerializer.fromFile(evaluatorConfigFile); 113 } catch (final IOException e) { 114 final String message = "Unable to parse the configuration file " + this.evaluatorConfigurationPath; 115 fail(message, e); 116 throw new RuntimeException(message, e); 117 } 118 } 119 } 120 121 /** 122 * @return the part of the clock configuration *not* read from disk. 123 */ 124 private Configuration getStaticClockConfiguration() { 125 final JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder() 126 .bindNamedParameter(LaunchID.class, this.launchID) 127 .bindNamedParameter(ErrorHandlerRID.class, this.errorHandlerID) 128 .bindSetEntry(Clock.StartHandler.class, PIDStoreStartHandler.class) 129 .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) 130 .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER") 131 .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class); 132 if (this.isProfilingEnabled) { 133 builder.bindSetEntry(Clock.StopHandler.class, ProfilingStopHandler.class); 134 } 135 return builder.build(); 136 } 137 138 /** 139 * Instantiates the clock. 140 * 141 * @return a clock object. 142 */ 143 private Clock getClock() { 144 try { 145 final Injector clockInjector = Tang.Factory.getTang().newInjector(this.getClockConfiguration()); 146 if (isProfilingEnabled) { 147 clockInjector.bindAspect(profiler); 148 } 149 clockInjector.bindVolatileInstance(RemoteManager.class, this.remoteManager); 150 return clockInjector.getInstance(Clock.class); 151 } catch (final Throwable ex) { 152 fail("Unable to instantiate the clock", ex); 153 throw new RuntimeException("Unable to instantiate the clock", ex); 154 } 155 } 156 157 /** 158 * Starts the Clock. 159 * This blocks until the clock returns. 160 */ 161 @Override 162 public void run() { 163 LOG.entering(this.getClass().getName(), "run", "Starting the clock"); 164 try { 165 this.getClock().run(); 166 } catch (final Throwable t) { 167 fail("Fatal exception while executing the clock", t); 168 } 169 LOG.exiting(this.getClass().getName(), "run", "Clock terminated"); 170 } 171 172 /** 173 * Closes the remote manager managed by this class. 174 * 175 * @throws Exception 176 */ 177 @Override 178 public void close() throws Exception { 179 LOG.entering(this.getClass().getName(), "close"); 180 this.errorHandler.close(); // Also closes the remoteManager 181 LOG.exiting(this.getClass().getName(), "close"); 182 } 183 184 private void fail(final String message, final Throwable throwable) { 185 this.errorHandler.onNext(new Exception(message, throwable)); 186 } 187 188 @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false") 189 public final static class ProfilingEnabled implements Name<Boolean> { 190 } 191}