001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.client; 020 021import org.apache.reef.annotations.Provided; 022import org.apache.reef.annotations.audience.ClientSide; 023import org.apache.reef.annotations.audience.Public; 024import org.apache.reef.tang.Configuration; 025import org.apache.reef.tang.Tang; 026import org.apache.reef.tang.annotations.Unit; 027import org.apache.reef.tang.exceptions.BindException; 028import org.apache.reef.tang.exceptions.InjectionException; 029import org.apache.reef.util.Optional; 030import org.apache.reef.wake.EventHandler; 031 032import javax.inject.Inject; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036/** 037 * A launcher for REEF Drivers. 038 * <p> 039 * It can be instantiated using a configuration that can create a REEF instance. 040 * For example, the local resourcemanager and the YARN resourcemanager can do this. 041 * <p> 042 * See {@link org.apache.reef.examples.hello} package for a demo use case. 043 */ 044@Public 045@Provided 046@ClientSide 047@Unit 048public final class DriverLauncher { 049 050 private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName()); 051 private final REEF reef; 052 private LauncherStatus status = LauncherStatus.INIT; 053 private RunningJob theJob = null; 054 055 @Inject 056 private DriverLauncher(final REEF reef) { 057 this.reef = reef; 058 } 059 060 /** 061 * Instantiate a launcher for the given Configuration. 062 * 063 * @param runtimeConfiguration the resourcemanager configuration to be used 064 * @return a DriverLauncher based on the given resourcemanager configuration 065 * @throws BindException on configuration errors 066 * @throws InjectionException on configuration errors 067 */ 068 public static DriverLauncher getLauncher( 069 final Configuration runtimeConfiguration) throws BindException, InjectionException { 070 071 final Configuration clientConfiguration = ClientConfiguration.CONF 072 .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class) 073 .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class) 074 .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class) 075 .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class) 076 .build(); 077 078 return Tang.Factory.getTang() 079 .newInjector(runtimeConfiguration, clientConfiguration) 080 .getInstance(DriverLauncher.class); 081 } 082 083 /** 084 * Kills the running job. 085 */ 086 public synchronized void close() { 087 if (this.status.isRunning()) { 088 this.status = LauncherStatus.FORCE_CLOSED; 089 } 090 if (null != this.theJob) { 091 this.theJob.close(); 092 } 093 this.notify(); 094 } 095 096 /** 097 * Run a job. Waits indefinitely for the job to complete. 098 * 099 * @param driverConfig the configuration for the driver. See DriverConfiguration for details. 100 * @return the state of the job after execution. 101 */ 102 public LauncherStatus run(final Configuration driverConfig) { 103 this.reef.submit(driverConfig); 104 synchronized (this) { 105 while (!this.status.isDone()) { 106 try { 107 LOG.log(Level.FINE, "Wait indefinitely"); 108 this.wait(); 109 } catch (final InterruptedException ex) { 110 LOG.log(Level.FINE, "Interrupted: {0}", ex); 111 } 112 } 113 } 114 this.reef.close(); 115 synchronized (this) { 116 return this.status; 117 } 118 } 119 120 /** 121 * Run a job with a waiting timeout after which it will be killed, if it did not complete yet. 122 * 123 * @param driverConfig the configuration for the driver. See DriverConfiguration for details. 124 * @param timeOut timeout on the job. 125 * @return the state of the job after execution. 126 */ 127 public LauncherStatus run(final Configuration driverConfig, final long timeOut) { 128 final long endTime = System.currentTimeMillis() + timeOut; 129 this.reef.submit(driverConfig); 130 synchronized (this) { 131 while (!this.status.isDone()) { 132 try { 133 final long waitTime = endTime - System.currentTimeMillis(); 134 if (waitTime <= 0) { 135 break; 136 } 137 LOG.log(Level.FINE, "Wait for {0} milliSeconds", waitTime); 138 this.wait(waitTime); 139 } catch (final InterruptedException ex) { 140 LOG.log(Level.FINE, "Interrupted: {0}", ex); 141 } 142 } 143 if (System.currentTimeMillis() >= endTime) { 144 LOG.log(Level.WARNING, "The Job timed out."); 145 this.status = LauncherStatus.FORCE_CLOSED; 146 } 147 } 148 149 this.reef.close(); 150 synchronized (this) { 151 return this.status; 152 } 153 } 154 155 /** 156 * @return the current status of the job. 157 */ 158 public LauncherStatus getStatus() { 159 synchronized (this) { 160 return this.status; 161 } 162 } 163 164 /** 165 * Update job status and notify the waiting thread. 166 */ 167 @SuppressWarnings("checkstyle:hiddenfield") 168 public synchronized void setStatusAndNotify(final LauncherStatus status) { 169 LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[]{this.status, status}); 170 this.status = status; 171 this.notify(); 172 } 173 174 @Override 175 public String toString() { 176 return this.status.toString(); 177 } 178 179 /** 180 * Job driver notifies us that the job is running. 181 */ 182 public final class RunningJobHandler implements EventHandler<RunningJob> { 183 @Override 184 public void onNext(final RunningJob job) { 185 LOG.log(Level.INFO, "The Job {0} is running.", job.getId()); 186 theJob = job; 187 setStatusAndNotify(LauncherStatus.RUNNING); 188 } 189 } 190 191 /** 192 * Job driver notifies us that the job had failed. 193 */ 194 public final class FailedJobHandler implements EventHandler<FailedJob> { 195 @Override 196 public void onNext(final FailedJob job) { 197 final Optional<Throwable> ex = job.getReason(); 198 LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex); 199 theJob = null; 200 setStatusAndNotify(LauncherStatus.failed(ex)); 201 } 202 } 203 204 /** 205 * Job driver notifies us that the job had completed successfully. 206 */ 207 public final class CompletedJobHandler implements EventHandler<CompletedJob> { 208 @Override 209 public void onNext(final CompletedJob job) { 210 LOG.log(Level.INFO, "The Job {0} is done.", job.getId()); 211 theJob = null; 212 setStatusAndNotify(LauncherStatus.COMPLETED); 213 } 214 } 215 216 /** 217 * Handler an error in the job driver. 218 */ 219 public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { 220 @Override 221 public void onNext(final FailedRuntime error) { 222 LOG.log(Level.SEVERE, "Received a resourcemanager error", error.getReason()); 223 theJob = null; 224 setStatusAndNotify(LauncherStatus.failed(error.getReason())); 225 } 226 } 227}