001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common; 020 021import org.apache.reef.proto.ReefServiceProtos; 022import org.apache.reef.runtime.common.driver.client.JobStatusHandler; 023import org.apache.reef.runtime.common.launch.ProfilingStopHandler; 024import org.apache.reef.runtime.common.launch.REEFErrorHandler; 025import org.apache.reef.tang.Configuration; 026import org.apache.reef.tang.Configurations; 027import org.apache.reef.tang.Injector; 028import org.apache.reef.tang.Tang; 029import org.apache.reef.tang.exceptions.InjectionException; 030import org.apache.reef.util.EnvironmentUtils; 031import org.apache.reef.util.REEFVersion; 032import org.apache.reef.wake.profiler.WakeProfiler; 033import org.apache.reef.wake.profiler.ProfilerState; 034import org.apache.reef.wake.time.Clock; 035 036import java.io.IOException; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * The main entry point into any REEF process (Driver and Evaluator). 042 * It is mostly reading from the command line to instantiate 043 * the runtime clock and calling .run() on it. 044 */ 045public final class REEFEnvironment implements Runnable, AutoCloseable { 046 047 private static final Logger LOG = Logger.getLogger(REEFEnvironment.class.getName()); 048 049 private static final String CLASS_NAME = REEFEnvironment.class.getCanonicalName(); 050 051 private static final Tang TANG = Tang.Factory.getTang(); 052 053 /** Main event loop of current REEF component (Driver or Evaluator). */ 054 private final Clock clock; 055 056 /** Error handler that processes all uncaught REEF exceptions. */ 057 private final REEFErrorHandler errorHandler; 058 059 private final JobStatusHandler jobStatusHandler; 060 061 /** 062 * Create a new REEF environment. 063 * @param configurations REEF component (Driver or Evaluator) configuration. 064 * If multiple configurations are provided, they will be merged before use. 065 * Main part of the configuration is usually read from config file by REEFLauncher. 066 * @throws InjectionException Thrown on configuration error. 067 */ 068 public static REEFEnvironment fromConfiguration(final Configuration... configurations) throws InjectionException { 069 return fromConfiguration(null, configurations); 070 } 071 072 /** 073 * Create a new REEF environment. 074 * @param hostUser User credentials to use when registering REEF app with the Resource Manager. 075 * This parameter may be required for Unmanaged AM mode. Can be null. 076 * @param configurations REEF component (Driver or Evaluator) configuration. 077 * If multiple configurations are provided, they will be merged before use. 078 * Main part of the configuration is usually read from config file by REEFLauncher. 079 * @throws InjectionException Thrown on configuration error. 080 */ 081 @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler 082 public static REEFEnvironment fromConfiguration( 083 final UserCredentials hostUser, final Configuration... configurations) throws InjectionException { 084 085 final Configuration config = Configurations.merge(configurations); 086 087 if (LOG.isLoggable(Level.FINEST)) { 088 LOG.log(Level.FINEST, "Configuration:\n--\n{0}\n--", Configurations.toString(config, true)); 089 } 090 091 final Injector injector = TANG.newInjector(config); 092 093 if (ProfilerState.isProfilingEnabled(injector)) { 094 final WakeProfiler profiler = new WakeProfiler(); 095 ProfilingStopHandler.setProfiler(profiler); 096 injector.bindAspect(profiler); 097 } 098 099 injector.getInstance(REEFVersion.class).logVersion(); 100 101 final REEFErrorHandler errorHandler = injector.getInstance(REEFErrorHandler.class); 102 final JobStatusHandler jobStatusHandler = injector.getInstance(JobStatusHandler.class); 103 104 if (hostUser != null) { 105 try { 106 injector.getInstance(UserCredentials.class).set("reef-proxy", hostUser); 107 } catch (final IOException ex) { 108 final String msg = "Cannot copy user credentials: " + hostUser; 109 LOG.log(Level.SEVERE, msg, ex); 110 throw new RuntimeException(msg, ex); 111 } 112 } 113 114 try { 115 116 final Clock clock = injector.getInstance(Clock.class); 117 return new REEFEnvironment(clock, errorHandler, jobStatusHandler); 118 119 } catch (final Throwable ex) { 120 LOG.log(Level.SEVERE, "Error while instantiating the clock", ex); 121 try { 122 errorHandler.onNext(ex); 123 } catch (final Throwable exHandling) { 124 LOG.log(Level.SEVERE, "Error while handling the exception " + ex, exHandling); 125 } 126 throw ex; 127 } 128 } 129 130 /** 131 * Use .fromConfiguration() method to create new REEF environment. 132 * @param clock main event loop. 133 * @param errorHandler error handler. 134 * @param jobStatusHandler an object that receives notifications on job status changes 135 * and can be queried for the last received job status. 136 */ 137 private REEFEnvironment( 138 final Clock clock, final REEFErrorHandler errorHandler, final JobStatusHandler jobStatusHandler) { 139 140 this.clock = clock; 141 this.errorHandler = errorHandler; 142 this.jobStatusHandler = jobStatusHandler; 143 } 144 145 /** 146 * Close and cleanup the environment. 147 * Invoke .close() on all closeable members (clock and error handler). 148 */ 149 @Override 150 @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler 151 public void close() { 152 153 LOG.entering(CLASS_NAME, "close"); 154 155 try { 156 this.clock.close(); 157 } catch (final Throwable ex) { 158 LOG.log(Level.SEVERE, "Error while closing the clock", ex); 159 try { 160 this.errorHandler.onNext(ex); 161 } catch (final Throwable exHandling) { 162 LOG.log(Level.SEVERE, "Error while handling the exception " + ex, exHandling); 163 } 164 } finally { 165 try { 166 this.errorHandler.close(); 167 } catch (final Throwable ex) { 168 LOG.log(Level.SEVERE, "Error while closing the error handler", ex); 169 } 170 } 171 172 LOG.exiting(CLASS_NAME, "close"); 173 } 174 175 /** 176 * Launch REEF component (Driver or Evaluator). 177 * It is usually called from the static .run() method. 178 * Check the status of the run via .getLastStatus() method. 179 */ 180 @Override 181 @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler 182 public void run() { 183 184 LOG.log(Level.FINE, "REEF started with user name [{0}]", System.getProperty("user.name")); 185 LOG.log(Level.FINE, "REEF started. Assertions are {0} in this process.", 186 EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); 187 188 try { 189 190 LOG.log(Level.FINEST, "Clock: start"); 191 this.clock.run(); 192 LOG.log(Level.FINEST, "Clock: exit normally: {0}", this.getLastStatus()); 193 194 } catch (final Throwable ex) { 195 LOG.log(Level.SEVERE, "Clock: Error in main event loop", ex); 196 this.errorHandler.onNext(ex); 197 } 198 } 199 200 /** 201 * Get the last known status of REEF job. Can return null if job has not started yet. 202 * @return Status of the REEF job launched in this environment. 203 */ 204 public ReefServiceProtos.JobStatusProto getLastStatus() { 205 return this.jobStatusHandler.getLastStatus(); 206 } 207}