001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.client; 020 021import org.apache.reef.annotations.Provided; 022import org.apache.reef.annotations.audience.ClientSide; 023import org.apache.reef.annotations.audience.Public; 024import org.apache.reef.runtime.common.UserCredentials; 025import org.apache.reef.tang.Configuration; 026import org.apache.reef.tang.Tang; 027import org.apache.reef.tang.annotations.Unit; 028import org.apache.reef.tang.exceptions.InjectionException; 029import org.apache.reef.util.Optional; 030import org.apache.reef.wake.EventHandler; 031 032import javax.inject.Inject; 033import java.util.Collections; 034import java.util.HashSet; 035import java.util.logging.Level; 036import java.util.logging.Logger; 037 038/** 039 * A launcher for REEF Drivers. 040 * <p> 041 * It can be instantiated using a configuration that can create a REEF instance. 042 * For example, the local resource manager and the YARN resource manager can do this. 043 * <p> 044 * See {@link org.apache.reef.examples.hello} package for a demo use case. 045 */ 046@Public 047@Provided 048@ClientSide 049@Unit 050public final class DriverLauncher implements AutoCloseable { 051 052 private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName()); 053 054 private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF 055 .set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class) 056 .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class) 057 .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class) 058 .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class) 059 .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class) 060 .build(); 061 062 private final REEF reef; 063 private final UserCredentials user; 064 065 private LauncherStatus status = LauncherStatus.INIT; 066 067 private String jobId; 068 private RunningJob theJob; 069 070 @Inject 071 private DriverLauncher(final REEF reef, final UserCredentials user) { 072 this.reef = reef; 073 this.user = user; 074 } 075 076 public UserCredentials getUser() { 077 return this.user; 078 } 079 080 /** 081 * Instantiate a launcher for the given Configuration. 082 * 083 * @param runtimeConfiguration the resourcemanager configuration to be used 084 * @return a DriverLauncher based on the given resourcemanager configuration 085 * @throws InjectionException on configuration errors 086 */ 087 public static DriverLauncher getLauncher(final Configuration runtimeConfiguration) throws InjectionException { 088 return Tang.Factory.getTang() 089 .newInjector(runtimeConfiguration, CLIENT_CONFIG) 090 .getInstance(DriverLauncher.class); 091 } 092 093 /** 094 * Kills the running job. 095 */ 096 @Override 097 public void close() { 098 synchronized (this) { 099 LOG.log(Level.FINER, "Close launcher: job {0} with status {1}", new Object[] {this.theJob, this.status}); 100 if (this.status.isRunning()) { 101 this.status = LauncherStatus.FORCE_CLOSED; 102 } 103 if (null != this.theJob) { 104 this.theJob.close(); 105 } 106 this.notify(); 107 } 108 LOG.log(Level.FINEST, "Close launcher: shutdown REEF"); 109 this.reef.close(); 110 LOG.log(Level.FINEST, "Close launcher: done"); 111 } 112 113 /** 114 * Run a job. Waits indefinitely for the job to complete. 115 * 116 * @param driverConfig the configuration for the driver. See DriverConfiguration for details. 117 * @return the state of the job after execution. 118 */ 119 public LauncherStatus run(final Configuration driverConfig) { 120 this.reef.submit(driverConfig); 121 synchronized (this) { 122 while (!this.status.isDone()) { 123 try { 124 LOG.log(Level.FINE, "Wait indefinitely"); 125 this.wait(); 126 } catch (final InterruptedException ex) { 127 LOG.log(Level.FINE, "Interrupted: {0}", ex); 128 } 129 } 130 } 131 this.reef.close(); 132 return this.getStatus(); 133 } 134 135 /** 136 * Submit REEF job asynchronously and do not wait for its completion. 137 * 138 * @param driverConfig configuration of hte driver to submit to the RM. 139 * @return ID of the new application. 140 */ 141 public String submit(final Configuration driverConfig, final long waitTime) { 142 this.reef.submit(driverConfig); 143 this.waitForStatus(waitTime, LauncherStatus.SUBMITTED); 144 return this.jobId; 145 } 146 147 /** 148 * Wait for one of the specified statuses of the REEF job. 149 * This method is called after the job is submitted to the RM via submit(). 150 * @param waitTime wait time in milliseconds. 151 * @param statuses array of statuses to wait for. 152 * @return the state of the job after the wait. 153 */ 154 public LauncherStatus waitForStatus(final long waitTime, final LauncherStatus... statuses) { 155 156 final long endTime = System.currentTimeMillis() + waitTime; 157 158 final HashSet<LauncherStatus> statSet = new HashSet<>(statuses.length * 2); 159 Collections.addAll(statSet, statuses); 160 Collections.addAll(statSet, LauncherStatus.FAILED, LauncherStatus.FORCE_CLOSED); 161 162 LOG.log(Level.FINEST, "Wait for status: {0}", statSet); 163 final LauncherStatus finalStatus; 164 165 synchronized (this) { 166 while (!statSet.contains(this.status)) { 167 try { 168 final long delay = endTime - System.currentTimeMillis(); 169 if (delay <= 0) { 170 break; 171 } 172 LOG.log(Level.FINE, "Wait for {0} milliSeconds", delay); 173 this.wait(delay); 174 } catch (final InterruptedException ex) { 175 LOG.log(Level.FINE, "Interrupted: {0}", ex); 176 } 177 } 178 179 finalStatus = this.status; 180 } 181 182 LOG.log(Level.FINEST, "Final status: {0}", finalStatus); 183 return finalStatus; 184 } 185 186 /** 187 * Run a job with a waiting timeout after which it will be killed, if it did not complete yet. 188 * 189 * @param driverConfig the configuration for the driver. See DriverConfiguration for details. 190 * @param timeOut timeout on the job. 191 * @return the state of the job after execution. 192 */ 193 public LauncherStatus run(final Configuration driverConfig, final long timeOut) { 194 195 final long startTime = System.currentTimeMillis(); 196 197 this.reef.submit(driverConfig); 198 this.waitForStatus(timeOut - System.currentTimeMillis() + startTime, LauncherStatus.COMPLETED); 199 200 if (System.currentTimeMillis() - startTime >= timeOut) { 201 LOG.log(Level.WARNING, "The Job timed out."); 202 synchronized (this) { 203 this.status = LauncherStatus.FORCE_CLOSED; 204 } 205 } 206 207 this.reef.close(); 208 return this.getStatus(); 209 } 210 211 /** 212 * @return the current status of the job. 213 */ 214 public synchronized LauncherStatus getStatus() { 215 return this.status; 216 } 217 218 /** Update job status and notify the waiting thread. */ 219 public synchronized void setStatusAndNotify(final LauncherStatus newStatus) { 220 LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus}); 221 this.status = newStatus; 222 this.notify(); 223 } 224 225 @Override 226 public String toString() { 227 return String.format("DriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status); 228 } 229 230 /** 231 * Job driver notifies us that the job has been submitted to the Resource Manager. 232 */ 233 public final class SubmittedJobHandler implements EventHandler<SubmittedJob> { 234 @Override 235 public void onNext(final SubmittedJob job) { 236 LOG.log(Level.INFO, "REEF job submitted: {0}.", job.getId()); 237 jobId = job.getId(); 238 setStatusAndNotify(LauncherStatus.SUBMITTED); 239 } 240 } 241 242 /** 243 * Job driver notifies us that the job is running. 244 */ 245 public final class RunningJobHandler implements EventHandler<RunningJob> { 246 @Override 247 public void onNext(final RunningJob job) { 248 LOG.log(Level.INFO, "The Job {0} is running.", job.getId()); 249 theJob = job; 250 setStatusAndNotify(LauncherStatus.RUNNING); 251 } 252 } 253 254 /** 255 * Job driver notifies us that the job had failed. 256 */ 257 public final class FailedJobHandler implements EventHandler<FailedJob> { 258 @Override 259 public void onNext(final FailedJob job) { 260 final Optional<Throwable> ex = job.getReason(); 261 LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex); 262 theJob = null; 263 setStatusAndNotify(LauncherStatus.failed(ex)); 264 } 265 } 266 267 /** 268 * Job driver notifies us that the job had completed successfully. 269 */ 270 public final class CompletedJobHandler implements EventHandler<CompletedJob> { 271 @Override 272 public void onNext(final CompletedJob job) { 273 LOG.log(Level.INFO, "The Job {0} is done.", job.getId()); 274 theJob = null; 275 setStatusAndNotify(LauncherStatus.COMPLETED); 276 } 277 } 278 279 /** 280 * Handler an error in the job driver. 281 */ 282 public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { 283 @Override 284 public void onNext(final FailedRuntime error) { 285 LOG.log(Level.SEVERE, "Received a resource manager error", error.getReason()); 286 theJob = null; 287 setStatusAndNotify(LauncherStatus.failed(error.getReason())); 288 } 289 } 290}