This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common;
020
021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
022import org.apache.reef.runtime.common.launch.ProfilingStopHandler;
023import org.apache.reef.runtime.common.launch.REEFErrorHandler;
024import org.apache.reef.runtime.common.launch.REEFMessageCodec;
025import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
026import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
027import org.apache.reef.tang.*;
028import org.apache.reef.tang.annotations.Name;
029import org.apache.reef.tang.annotations.NamedParameter;
030import org.apache.reef.tang.annotations.Parameter;
031import org.apache.reef.tang.exceptions.BindException;
032import org.apache.reef.tang.exceptions.InjectionException;
033import org.apache.reef.tang.formats.ConfigurationSerializer;
034import org.apache.reef.util.EnvironmentUtils;
035import org.apache.reef.util.REEFVersion;
036import org.apache.reef.util.ThreadLogger;
037import org.apache.reef.util.logging.LoggingSetup;
038import org.apache.reef.wake.profiler.WakeProfiler;
039import org.apache.reef.wake.remote.RemoteConfiguration;
040import org.apache.reef.wake.time.Clock;
041
042import javax.inject.Inject;
043import java.io.File;
044import java.io.FileNotFoundException;
045import java.io.IOException;
046import java.util.logging.Level;
047import java.util.logging.Logger;
048
049/**
050 * The main entrance point into any REEF process. It is mostly reading from the command line to instantiate
051 * the runtime clock and calling .run() on it.
052 */
053public final class REEFLauncher {
054
055  /**
056   * Parameter which enables profiling.
057   */
058  @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false")
059  public static final class ProfilingEnabled implements Name<Boolean> {
060  }
061
062  private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName());
063
064  private static final Configuration LAUNCHER_STATIC_CONFIG = Tang.Factory.getTang().newConfigurationBuilder()
065          .bindSetEntry(Clock.StartHandler.class, PIDStoreStartHandler.class)
066          .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
067          .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER")
068          .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
069          .build();
070
071  static {
072    LoggingSetup.setupCommonsLogging();
073  }
074
075  private final String configurationPath;
076  private final boolean isProfilingEnabled;
077  private final ConfigurationSerializer configurationSerializer;
078  private final REEFVersion reefVersion;
079  private final Configuration clockConfig;
080
081  @Inject
082  private REEFLauncher(@Parameter(ClockConfigurationPath.class) final String configurationPath,
083               @Parameter(ProfilingEnabled.class) final boolean enableProfiling,
084               final ConfigurationSerializer configurationSerializer,
085               final REEFVersion reefVersion) {
086    this.configurationPath = configurationPath;
087    this.configurationSerializer = configurationSerializer;
088    this.isProfilingEnabled = enableProfiling;
089    this.reefVersion = reefVersion;
090    this.clockConfig = Configurations.merge(
091            readConfigurationFromDisk(this.configurationPath, this.configurationSerializer),
092            LAUNCHER_STATIC_CONFIG);
093  }
094
095  private static REEFLauncher getREEFLauncher(final String clockConfigPath) {
096    final Injector injector;
097    try {
098      final Configuration clockArgConfig = Tang.Factory.getTang().newConfigurationBuilder()
099              .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath).build();
100      injector = Tang.Factory.getTang().newInjector(clockArgConfig);
101    } catch (final BindException e) {
102      throw fatal("Error in parsing the command line", e);
103    }
104
105    try {
106      return injector.getInstance(REEFLauncher.class);
107    } catch (final InjectionException e) {
108      throw fatal("Unable to run REEFLauncher.", e);
109    }
110  }
111
112  private static RuntimeException fatal(final String msg, final Throwable t) {
113    LOG.log(Level.SEVERE, msg, t);
114    return new RuntimeException(msg, t);
115  }
116
117  private static RuntimeException fatal(final REEFErrorHandler errorHandler, final String msg, final Throwable t) {
118    errorHandler.onNext(t);
119    LOG.log(Level.SEVERE, msg, t);
120    return new RuntimeException(msg, t);
121  }
122
123  private static Configuration readConfigurationFromDisk(
124          final String configPath, final ConfigurationSerializer serializer) {
125    LOG.log(Level.FINEST, "Loading configuration file: {0}", configPath);
126
127    final File evaluatorConfigFile = new File(configPath);
128
129    if (!evaluatorConfigFile.exists()) {
130      final String message = "The configuration file " + configPath +
131          "doesn't exist. This points to an issue in the job submission.";
132      throw fatal(message, new FileNotFoundException());
133    } else if (!evaluatorConfigFile.canRead()) {
134      final String message = "The configuration file " + configPath +
135          " exists, but can't be read";
136      throw fatal(message, new IOException());
137    } else {
138      try {
139        return serializer.fromFile(evaluatorConfigFile);
140      } catch (final IOException e) {
141        final String message = "Unable to parse the configuration file " + configPath;
142        throw fatal(message, e);
143      }
144    }
145  }
146
147  /**
148   * Launches a REEF client process (Driver or Evaluator).
149   *
150   * @param args command-line args
151   */
152  @SuppressWarnings("checkstyle:illegalcatch")
153  public static void main(final String[] args) {
154    LOG.log(Level.INFO, "Entering REEFLauncher.main().");
155    LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name"));
156
157    LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.",
158            EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
159
160    if (args.length != 1) {
161      final String message = "REEFLauncher have one and only one argument to specify the runtime clock " +
162          "configuration path";
163
164      throw fatal(message, new IllegalArgumentException(message));
165    }
166
167    final REEFLauncher launcher = getREEFLauncher(args[0]);
168
169    Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.clockConfig));
170    launcher.logVersion();
171
172    try (final Clock clock = launcher.getClockFromConfig()) {
173      LOG.log(Level.FINE, "Clock starting");
174      clock.run();
175      LOG.log(Level.FINE, "Clock exiting");
176    } catch (final Throwable ex) {
177      try (final REEFErrorHandler errorHandler = launcher.getErrorHandlerFromConfig()) {
178        throw fatal(errorHandler, "Unable to instantiate the clock", ex);
179      } catch (final InjectionException e) {
180        throw fatal("Unable to instantiate the clock and the ErrorHandler", e);
181      }
182    }
183
184    LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
185    if (LOG.isLoggable(Level.FINEST)) {
186      LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after REEFLauncher.close():"));
187    }
188    System.exit(0);
189    if (LOG.isLoggable(Level.FINEST)) {
190      LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after System.exit():"));
191    }
192  }
193
194  private void logVersion() {
195    this.reefVersion.logVersion();
196  }
197
198  /**
199   * A new REEFErrorHandler is instantiated instead of lazy instantiation and saving the instantiated
200   * handler as a field since the ErrorHandler is closeable.
201   * @return A new REEFErrorHandler from clock config
202   * @throws InjectionException
203   */
204  private REEFErrorHandler getErrorHandlerFromConfig() throws InjectionException {
205    return Tang.Factory.getTang().newInjector(this.clockConfig).getInstance(REEFErrorHandler.class);
206  }
207
208  /**
209   * A new Clock is instantiated instead of lazy instantiation and saving the instantiated
210   * handler as a field since the Clock is closeable.
211   * @return A new Clock from clock config
212   * @throws InjectionException
213   */
214  private Clock getClockFromConfig() throws InjectionException {
215    final Injector clockInjector = Tang.Factory.getTang().newInjector(this.clockConfig);
216    if (this.isProfilingEnabled) {
217      final WakeProfiler profiler = new WakeProfiler();
218      ProfilingStopHandler.setProfiler(profiler);
219      clockInjector.bindAspect(profiler);
220    }
221
222    return clockInjector.getInstance(Clock.class);
223  }
224}