This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common;
020
021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
022import org.apache.reef.runtime.common.launch.ProfilingStopHandler;
023import org.apache.reef.runtime.common.launch.REEFErrorHandler;
024import org.apache.reef.runtime.common.launch.REEFMessageCodec;
025import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
026import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
027import org.apache.reef.tang.*;
028import org.apache.reef.tang.annotations.Name;
029import org.apache.reef.tang.annotations.NamedParameter;
030import org.apache.reef.tang.annotations.Parameter;
031import org.apache.reef.tang.exceptions.BindException;
032import org.apache.reef.tang.exceptions.InjectionException;
033import org.apache.reef.tang.formats.ConfigurationSerializer;
034import org.apache.reef.util.EnvironmentUtils;
035import org.apache.reef.util.REEFVersion;
036import org.apache.reef.util.ThreadLogger;
037import org.apache.reef.util.logging.LoggingSetup;
038import org.apache.reef.wake.profiler.WakeProfiler;
039import org.apache.reef.wake.remote.RemoteConfiguration;
040import org.apache.reef.wake.time.Clock;
041
042import javax.inject.Inject;
043import java.io.File;
044import java.io.FileNotFoundException;
045import java.io.IOException;
046import java.util.logging.Level;
047import java.util.logging.Logger;
048
049/**
050 * The main entrance point into any REEF process. It is mostly reading from the command line to instantiate
051 * the runtime clock and calling .run() on it.
052 */
053public final class REEFLauncher {
054
055  /**
056   * Parameter which enables profiling.
057   */
058  @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false")
059  public static final class ProfilingEnabled implements Name<Boolean> {
060  }
061
062  private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName());
063
064  private static final Configuration LAUNCHER_STATIC_CONFIG = Tang.Factory.getTang().newConfigurationBuilder()
065          .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class)
066          .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
067          .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER")
068          .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
069          .build();
070
071  static {
072    LoggingSetup.setupCommonsLogging();
073  }
074
075  private final String configurationPath;
076  private final boolean isProfilingEnabled;
077  private final ConfigurationSerializer configurationSerializer;
078  private final REEFVersion reefVersion;
079  private final Configuration clockConfig;
080
081  @Inject
082  private REEFLauncher(@Parameter(ClockConfigurationPath.class) final String configurationPath,
083               @Parameter(ProfilingEnabled.class) final boolean enableProfiling,
084               final ConfigurationSerializer configurationSerializer,
085               final REEFVersion reefVersion) {
086    this.configurationPath = configurationPath;
087    this.configurationSerializer = configurationSerializer;
088    this.isProfilingEnabled = enableProfiling;
089    this.reefVersion = reefVersion;
090    this.clockConfig = Configurations.merge(
091            readConfigurationFromDisk(this.configurationPath, this.configurationSerializer),
092            LAUNCHER_STATIC_CONFIG);
093  }
094
095  private static REEFLauncher getREEFLauncher(final String clockConfigPath) {
096    final Injector injector;
097    try {
098      final Configuration clockArgConfig = Tang.Factory.getTang().newConfigurationBuilder()
099              .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath).build();
100      injector = Tang.Factory.getTang().newInjector(clockArgConfig);
101    } catch (final BindException e) {
102      throw fatal("Error in parsing the command line", e);
103    }
104
105    try {
106      return injector.getInstance(REEFLauncher.class);
107    } catch (final InjectionException e) {
108      throw fatal("Unable to run REEFLauncher.", e);
109    }
110  }
111
112  private static RuntimeException fatal(final String msg, final Throwable t) {
113    LOG.log(Level.SEVERE, msg, t);
114    return new RuntimeException(msg, t);
115  }
116
117  private static RuntimeException fatal(final REEFErrorHandler errorHandler, final String msg, final Throwable t) {
118    errorHandler.onNext(t);
119    LOG.log(Level.SEVERE, msg, t);
120    return new RuntimeException(msg, t);
121  }
122
123  @SuppressWarnings("checkstyle:constructorwithoutparams") // avoids logging the same message twice in fatal()
124  private static Configuration readConfigurationFromDisk(
125          final String configPath, final ConfigurationSerializer serializer) {
126    LOG.log(Level.FINEST, "Loading configuration file: {0}", configPath);
127
128    final File evaluatorConfigFile = new File(configPath);
129
130    if (!evaluatorConfigFile.exists()) {
131      final String message = "The configuration file " + configPath +
132          "doesn't exist. This points to an issue in the job submission.";
133      throw fatal(message, new FileNotFoundException());
134    } else if (!evaluatorConfigFile.canRead()) {
135      final String message = "The configuration file " + configPath +
136          " exists, but can't be read";
137      throw fatal(message, new IOException());
138    } else {
139      try {
140        return serializer.fromFile(evaluatorConfigFile);
141      } catch (final IOException e) {
142        final String message = "Unable to parse the configuration file " + configPath;
143        throw fatal(message, e);
144      }
145    }
146  }
147
148  /**
149   * Launches a REEF client process (Driver or Evaluator).
150   *
151   * @param args command-line args
152   */
153  @SuppressWarnings("checkstyle:illegalcatch")
154  public static void main(final String[] args) {
155    LOG.log(Level.INFO, "Entering REEFLauncher.main().");
156    LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name"));
157
158    LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.",
159            EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
160
161    if (args.length != 1) {
162      final String message = "REEFLauncher have one and only one argument to specify the runtime clock " +
163          "configuration path";
164
165      throw fatal(message, new IllegalArgumentException(message));
166    }
167
168    final REEFLauncher launcher = getREEFLauncher(args[0]);
169
170    Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.clockConfig));
171    launcher.logVersion();
172
173    try (final Clock clock = launcher.getClockFromConfig()) {
174      LOG.log(Level.FINE, "Clock starting");
175      clock.run();
176      LOG.log(Level.FINE, "Clock exiting");
177    } catch (final Throwable ex) {
178      try (final REEFErrorHandler errorHandler = launcher.getErrorHandlerFromConfig()) {
179        throw fatal(errorHandler, "Unable to instantiate the clock", ex);
180      } catch (final InjectionException e) {
181        throw fatal("Unable to instantiate the clock and the ErrorHandler", e);
182      }
183    }
184
185    LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
186    if (LOG.isLoggable(Level.FINEST)) {
187      LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after REEFLauncher.close():"));
188    }
189    System.exit(0);
190    if (LOG.isLoggable(Level.FINEST)) {
191      LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after System.exit():"));
192    }
193  }
194
195  private void logVersion() {
196    this.reefVersion.logVersion();
197  }
198
199  /**
200   * A new REEFErrorHandler is instantiated instead of lazy instantiation and saving the instantiated
201   * handler as a field since the ErrorHandler is closeable.
202   * @return A new REEFErrorHandler from clock config
203   * @throws InjectionException
204   */
205  private REEFErrorHandler getErrorHandlerFromConfig() throws InjectionException {
206    return Tang.Factory.getTang().newInjector(this.clockConfig).getInstance(REEFErrorHandler.class);
207  }
208
209  /**
210   * A new Clock is instantiated instead of lazy instantiation and saving the instantiated
211   * handler as a field since the Clock is closeable.
212   * @return A new Clock from clock config
213   * @throws InjectionException
214   */
215  private Clock getClockFromConfig() throws InjectionException {
216    final Injector clockInjector = Tang.Factory.getTang().newInjector(this.clockConfig);
217    if (this.isProfilingEnabled) {
218      final WakeProfiler profiler = new WakeProfiler();
219      ProfilingStopHandler.setProfiler(profiler);
220      clockInjector.bindAspect(profiler);
221    }
222
223    return clockInjector.getInstance(Clock.class);
224  }
225}