This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.launch;
020
021import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
022import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
023import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
024import org.apache.reef.runtime.common.launch.parameters.LaunchID;
025import org.apache.reef.runtime.common.utils.RemoteManager;
026import org.apache.reef.tang.*;
027import org.apache.reef.tang.annotations.Name;
028import org.apache.reef.tang.annotations.NamedParameter;
029import org.apache.reef.tang.annotations.Parameter;
030import org.apache.reef.tang.formats.ConfigurationSerializer;
031import org.apache.reef.util.REEFVersion;
032import org.apache.reef.wake.profiler.WakeProfiler;
033import org.apache.reef.wake.remote.RemoteConfiguration;
034import org.apache.reef.wake.time.Clock;
035
036import javax.inject.Inject;
037import java.io.File;
038import java.io.FileNotFoundException;
039import java.io.IOException;
040import java.util.logging.Level;
041import java.util.logging.Logger;
042
043/**
044 * This encapsulates processes started by REEF.
045 */
046public final class LaunchClass implements AutoCloseable, Runnable {
047
048  private static final Logger LOG = Logger.getLogger(LaunchClass.class.getName());
049  private final RemoteManager remoteManager;
050  private final String launchID;
051  private final String errorHandlerID;
052  private final String evaluatorConfigurationPath;
053  private final boolean isProfilingEnabled;
054  private final REEFErrorHandler errorHandler;
055  private final ConfigurationSerializer configurationSerializer;
056  private WakeProfiler profiler;
057
058  @Inject
059  LaunchClass(final RemoteManager remoteManager,
060              final REEFUncaughtExceptionHandler uncaughtExceptionHandler,
061              final REEFErrorHandler errorHandler,
062              final @Parameter(LaunchID.class) String launchID,
063              final @Parameter(ErrorHandlerRID.class) String errorHandlerID,
064              final @Parameter(ClockConfigurationPath.class) String evaluatorConfigurationPath,
065              final @Parameter(ProfilingEnabled.class) boolean enableProfiling,
066              final ConfigurationSerializer configurationSerializer,
067              final REEFVersion reefVersion) {
068    reefVersion.logVersion();
069    this.remoteManager = remoteManager;
070    this.launchID = launchID;
071    this.errorHandlerID = errorHandlerID;
072    this.evaluatorConfigurationPath = evaluatorConfigurationPath;
073    this.isProfilingEnabled = enableProfiling;
074    this.errorHandler = errorHandler;
075    this.configurationSerializer = configurationSerializer;
076
077
078    // Registering a default exception handler. It sends every exception to the upstream RemoteManager
079    Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
080
081
082    if (isProfilingEnabled) {
083      this.profiler = new WakeProfiler();
084      ProfilingStopHandler.setProfiler(profiler); // TODO: This probably should be bound via Tang.
085    }
086  }
087
088  /**
089   * Loads the client and resource manager configuration files from disk.
090   */
091  private Configuration getClockConfiguration() {
092    return Configurations.merge(readConfigurationFromDisk(), getStaticClockConfiguration());
093  }
094
095
096  private Configuration readConfigurationFromDisk() {
097    LOG.log(Level.FINEST, "Loading configuration file: {0}", this.evaluatorConfigurationPath);
098
099    final File evaluatorConfigFile = new File(this.evaluatorConfigurationPath);
100
101    if (!evaluatorConfigFile.exists()) {
102      final String message = "The configuration file " + this.evaluatorConfigurationPath +
103          "doesn't exist. This points to an issue in the job submission.";
104      fail(message, new FileNotFoundException());
105      throw new RuntimeException(message);
106    } else if (!evaluatorConfigFile.canRead()) {
107      final String message = "The configuration file " + this.evaluatorConfigurationPath + " exists, but can't be read";
108      fail(message, new IOException());
109      throw new RuntimeException(message);
110    } else {
111      try {
112        return this.configurationSerializer.fromFile(evaluatorConfigFile);
113      } catch (final IOException e) {
114        final String message = "Unable to parse the configuration file " + this.evaluatorConfigurationPath;
115        fail(message, e);
116        throw new RuntimeException(message, e);
117      }
118    }
119  }
120
121  /**
122   * @return the part of the clock configuration *not* read from disk.
123   */
124  private Configuration getStaticClockConfiguration() {
125    final JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder()
126        .bindNamedParameter(LaunchID.class, this.launchID)
127        .bindNamedParameter(ErrorHandlerRID.class, this.errorHandlerID)
128        .bindSetEntry(Clock.StartHandler.class, PIDStoreStartHandler.class)
129        .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
130        .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER")
131        .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class);
132    if (this.isProfilingEnabled) {
133      builder.bindSetEntry(Clock.StopHandler.class, ProfilingStopHandler.class);
134    }
135    return builder.build();
136  }
137
138  /**
139   * Instantiates the clock.
140   *
141   * @return a clock object.
142   */
143  private Clock getClock() {
144    try {
145      final Injector clockInjector = Tang.Factory.getTang().newInjector(this.getClockConfiguration());
146      if (isProfilingEnabled) {
147        clockInjector.bindAspect(profiler);
148      }
149      clockInjector.bindVolatileInstance(RemoteManager.class, this.remoteManager);
150      return clockInjector.getInstance(Clock.class);
151    } catch (final Throwable ex) {
152      fail("Unable to instantiate the clock", ex);
153      throw new RuntimeException("Unable to instantiate the clock", ex);
154    }
155  }
156
157  /**
158   * Starts the Clock.
159   * This blocks until the clock returns.
160   */
161  @Override
162  public void run() {
163    LOG.entering(this.getClass().getName(), "run", "Starting the clock");
164    try {
165      this.getClock().run();
166    } catch (final Throwable t) {
167      fail("Fatal exception while executing the clock", t);
168    }
169    LOG.exiting(this.getClass().getName(), "run", "Clock terminated");
170  }
171
172  /**
173   * Closes the remote manager managed by this class.
174   *
175   * @throws Exception
176   */
177  @Override
178  public void close() throws Exception {
179    LOG.entering(this.getClass().getName(), "close");
180    this.errorHandler.close(); // Also closes the remoteManager
181    LOG.exiting(this.getClass().getName(), "close");
182  }
183
184  private void fail(final String message, final Throwable throwable) {
185    this.errorHandler.onNext(new Exception(message, throwable));
186  }
187
188  @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false")
189  public final static class ProfilingEnabled implements Name<Boolean> {
190  }
191}