This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common;
020
021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
022import org.apache.reef.runtime.common.launch.REEFErrorHandler;
023import org.apache.reef.runtime.common.launch.REEFMessageCodec;
024import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
025import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
026import org.apache.reef.tang.*;
027import org.apache.reef.tang.annotations.Parameter;
028import org.apache.reef.tang.exceptions.BindException;
029import org.apache.reef.tang.exceptions.InjectionException;
030import org.apache.reef.tang.formats.ConfigurationSerializer;
031import org.apache.reef.util.EnvironmentUtils;
032import org.apache.reef.util.ThreadLogger;
033import org.apache.reef.util.logging.LoggingSetup;
034import org.apache.reef.wake.remote.RemoteConfiguration;
035import org.apache.reef.wake.time.Clock;
036
037import javax.inject.Inject;
038import java.io.File;
039import java.io.FileNotFoundException;
040import java.io.IOException;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044/**
045 * The main entry point into any REEF process (Driver and Evaluator).
046 * It is mostly reading from the command line to instantiate
047 * the runtime clock and calling .run() on it.
048 */
049public final class REEFLauncher {
050
051  private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName());
052
053  private static final Tang TANG = Tang.Factory.getTang();
054
055  private static final Configuration LAUNCHER_STATIC_CONFIG =
056      TANG.newConfigurationBuilder()
057          .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER")
058          .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
059          .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
060          .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class)
061          .build();
062
063  static {
064    LoggingSetup.setupCommonsLogging();
065  }
066
067  /**
068   * Main configuration object of the REEF component we are launching here.
069   * REEFEnvironment uses that configuration to instantiate the Clock object,
070   * and then call .run() on it.
071   */
072  private final Configuration envConfig;
073
074  /**
075   * REEFLauncher is instantiated in the main() method below using
076   * Tang configuration file provided as a command line argument.
077   * @param configurationPath Path to the serialized Tang configuration file.
078   * (The file must be in the local file system).
079   * @param configurationSerializer Serializer used to read the configuration file.
080   * We currently use Avro to serialize Tang configs.
081   */
082  @Inject
083  private REEFLauncher(
084      @Parameter(ClockConfigurationPath.class) final String configurationPath,
085      final ConfigurationSerializer configurationSerializer) {
086
087    this.envConfig = Configurations.merge(LAUNCHER_STATIC_CONFIG,
088        readConfigurationFromDisk(configurationPath, configurationSerializer));
089  }
090
091  /**
092   * Instantiate REEF Launcher. This method is called from REEFLauncher.main().
093   * @param clockConfigPath Path to the local file that contains serialized configuration
094   * of a REEF component to launch (can be either Driver or Evaluator).
095   * @return An instance of the configured REEFLauncher object.
096   */
097  private static REEFLauncher getREEFLauncher(final String clockConfigPath) {
098
099    try {
100
101      final Configuration clockArgConfig = TANG.newConfigurationBuilder()
102          .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath)
103          .build();
104
105      return TANG.newInjector(clockArgConfig).getInstance(REEFLauncher.class);
106
107    } catch (final BindException ex) {
108      throw fatal("Error in parsing the command line", ex);
109    } catch (final InjectionException ex) {
110      throw fatal("Unable to instantiate REEFLauncher.", ex);
111    }
112  }
113
114  /**
115   * Read configuration from a given file and deserialize it
116   * into Tang configuration object that can be used for injection.
117   * Configuration is currently serialized using Avro.
118   * This method also prints full deserialized configuration into log.
119   * @param configPath Path to the local file that contains serialized configuration
120   * of a REEF component to launch (can be either Driver or Evaluator).
121   * @param serializer An object to deserialize the configuration file.
122   * @return Tang configuration read and deserialized from a given file.
123   */
124  private static Configuration readConfigurationFromDisk(
125          final String configPath, final ConfigurationSerializer serializer) {
126
127    LOG.log(Level.FINER, "Loading configuration file: {0}", configPath);
128
129    final File evaluatorConfigFile = new File(configPath);
130
131    if (!evaluatorConfigFile.exists()) {
132      throw fatal(
133          "Configuration file " + configPath + " does not exist. Can be an issue in job submission.",
134          new FileNotFoundException(configPath));
135    }
136
137    if (!evaluatorConfigFile.canRead()) {
138      throw fatal(
139          "Configuration file " + configPath + " exists, but can't be read.",
140          new IOException(configPath));
141    }
142
143    try {
144
145      final Configuration config = serializer.fromFile(evaluatorConfigFile);
146      LOG.log(Level.FINEST, "Configuration file loaded: {0}", configPath);
147
148      return config;
149
150    } catch (final IOException e) {
151      throw fatal("Unable to parse the configuration file: " + configPath, e);
152    }
153  }
154
155  /**
156   * Launches a REEF client process (Driver or Evaluator).
157   * @param args Command-line arguments.
158   * Must be a single element containing local path to the configuration file.
159   */
160  @SuppressWarnings("checkstyle:illegalcatch")
161  public static void main(final String[] args) {
162
163    LOG.log(Level.INFO, "Entering REEFLauncher.main().");
164
165    LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name"));
166    LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.",
167            EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED");
168
169    if (args.length != 1) {
170      final String message = "REEFLauncher have one and only one argument to specify the runtime clock " +
171          "configuration path";
172
173      throw fatal(message, new IllegalArgumentException(message));
174    }
175
176    final REEFLauncher launcher = getREEFLauncher(args[0]);
177
178    Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig));
179
180    try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(launcher.envConfig)) {
181      reef.run();
182    } catch (final Throwable ex) {
183      throw fatal("Unable to configure and start REEFEnvironment.", ex);
184    }
185
186    ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after REEFEnvironment.close():");
187
188    LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
189
190    System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main()
191  }
192
193  /**
194   * Wrap an exception into RuntimeException with a given message,
195   * and write the same message and exception to the log.
196   * @param msg an error message to log and pass into the RuntimeException.
197   * @param t A Throwable exception to log and wrap.
198   * @return a new Runtime exception wrapping a Throwable.
199   */
200  private static RuntimeException fatal(final String msg, final Throwable t) {
201    LOG.log(Level.SEVERE, msg, t);
202    return new RuntimeException(msg, t);
203  }
204}