001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.client; 020 021import org.apache.reef.annotations.Provided; 022import org.apache.reef.annotations.audience.ClientSide; 023import org.apache.reef.annotations.audience.Public; 024import org.apache.reef.tang.Configuration; 025import org.apache.reef.tang.Tang; 026import org.apache.reef.tang.annotations.Unit; 027import org.apache.reef.tang.exceptions.BindException; 028import org.apache.reef.tang.exceptions.InjectionException; 029import org.apache.reef.util.Optional; 030import org.apache.reef.wake.EventHandler; 031 032import javax.inject.Inject; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036/** 037 * A launcher for REEF Drivers. 038 * <p/> 039 * It can be instantiated using a configuration that can create a REEF instance. 040 * For example, the local resourcemanager and the YARN resourcemanager can do this. 041 * <p/> 042 * {@see org.apache.reef.examples.hello.HelloREEF} for a demo use case. 043 */ 044@Public 045@Provided 046@ClientSide 047@Unit 048public final class DriverLauncher { 049 050 private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName()); 051 private final REEF reef; 052 private LauncherStatus status = LauncherStatus.INIT; 053 private RunningJob theJob = null; 054 055 @Inject 056 private DriverLauncher(final REEF reef) { 057 this.reef = reef; 058 } 059 060 /** 061 * Instantiate a launcher for the given Configuration. 062 * 063 * @param runtimeConfiguration the resourcemanager configuration to be used 064 * @return a DriverLauncher based on the given resourcemanager configuration 065 * @throws BindException on configuration errors 066 * @throws InjectionException on configuration errors 067 */ 068 public static DriverLauncher getLauncher( 069 final Configuration runtimeConfiguration) throws BindException, InjectionException { 070 071 final Configuration clientConfiguration = ClientConfiguration.CONF 072 .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class) 073 .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class) 074 .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class) 075 .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class) 076 .build(); 077 078 return Tang.Factory.getTang() 079 .newInjector(runtimeConfiguration, clientConfiguration) 080 .getInstance(DriverLauncher.class); 081 } 082 083 /** 084 * Kills the running job. 085 */ 086 public synchronized void close() { 087 if (this.status.isRunning()) { 088 this.status = LauncherStatus.FORCE_CLOSED; 089 } 090 if (null != this.theJob) { 091 this.theJob.close(); 092 } 093 this.notify(); 094 } 095 096 /** 097 * Run a job. Waits indefinitely for the job to complete. 098 * 099 * @param driverConfig the configuration for the driver. See DriverConfiguration for details. 100 * @return the state of the job after execution. 101 */ 102 public LauncherStatus run(final Configuration driverConfig) { 103 this.reef.submit(driverConfig); 104 synchronized (this) { 105 while (!this.status.isDone()) { 106 try { 107 LOG.log(Level.FINE, "Wait indefinitely"); 108 this.wait(); 109 } catch (final InterruptedException ex) { 110 LOG.log(Level.FINE, "Interrupted: {0}", ex); 111 } 112 } 113 } 114 this.reef.close(); 115 return this.status; 116 } 117 118 /** 119 * Run a job with a waiting timeout after which it will be killed, if it did not complete yet. 120 * 121 * @param driverConfig the configuration for the driver. See DriverConfiguration for details. 122 * @param timeOut timeout on the job. 123 * @return the state of the job after execution. 124 */ 125 public LauncherStatus run(final Configuration driverConfig, final long timeOut) { 126 final long endTime = System.currentTimeMillis() + timeOut; 127 this.reef.submit(driverConfig); 128 synchronized (this) { 129 while (!this.status.isDone()) { 130 try { 131 final long waitTime = endTime - System.currentTimeMillis(); 132 if (waitTime <= 0) { 133 break; 134 } 135 LOG.log(Level.FINE, "Wait for {0} milliSeconds", waitTime); 136 this.wait(waitTime); 137 } catch (final InterruptedException ex) { 138 LOG.log(Level.FINE, "Interrupted: {0}", ex); 139 } 140 } 141 if (System.currentTimeMillis() >= endTime) { 142 LOG.log(Level.WARNING, "The Job timed out."); 143 this.status = LauncherStatus.FORCE_CLOSED; 144 } 145 } 146 147 this.reef.close(); 148 return this.status; 149 } 150 151 /** 152 * @return the current status of the job. 153 */ 154 public LauncherStatus getStatus() { 155 return this.status; 156 } 157 158 /** 159 * Update job status and notify the waiting thread. 160 */ 161 public synchronized void setStatusAndNotify(final LauncherStatus status) { 162 LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[]{this.status, status}); 163 this.status = status; 164 this.notify(); 165 } 166 167 @Override 168 public String toString() { 169 return this.status.toString(); 170 } 171 172 /** 173 * Job driver notifies us that the job is running. 174 */ 175 public final class RunningJobHandler implements EventHandler<RunningJob> { 176 @Override 177 public void onNext(final RunningJob job) { 178 LOG.log(Level.INFO, "The Job {0} is running.", job.getId()); 179 theJob = job; 180 setStatusAndNotify(LauncherStatus.RUNNING); 181 } 182 } 183 184 /** 185 * Job driver notifies us that the job had failed. 186 */ 187 public final class FailedJobHandler implements EventHandler<FailedJob> { 188 @Override 189 public void onNext(final FailedJob job) { 190 final Optional<Throwable> ex = job.getReason(); 191 LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex); 192 theJob = null; 193 setStatusAndNotify(LauncherStatus.FAILED(ex)); 194 } 195 } 196 197 /** 198 * Job driver notifies us that the job had completed successfully. 199 */ 200 public final class CompletedJobHandler implements EventHandler<CompletedJob> { 201 @Override 202 public void onNext(final CompletedJob job) { 203 LOG.log(Level.INFO, "The Job {0} is done.", job.getId()); 204 theJob = null; 205 setStatusAndNotify(LauncherStatus.COMPLETED); 206 } 207 } 208 209 /** 210 * Handler an error in the job driver. 211 */ 212 public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { 213 @Override 214 public void onNext(final FailedRuntime error) { 215 LOG.log(Level.SEVERE, "Received a resourcemanager error", error.getReason()); 216 theJob = null; 217 setStatusAndNotify(LauncherStatus.FAILED(error.getReason())); 218 } 219 } 220}