001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.suspend; 020 021import org.apache.reef.client.*; 022import org.apache.reef.tang.Configuration; 023import org.apache.reef.tang.JavaConfigurationBuilder; 024import org.apache.reef.tang.Tang; 025import org.apache.reef.tang.annotations.Parameter; 026import org.apache.reef.tang.annotations.Unit; 027import org.apache.reef.tang.exceptions.BindException; 028import org.apache.reef.util.EnvironmentUtils; 029import org.apache.reef.wake.EventHandler; 030 031import javax.inject.Inject; 032import java.io.IOException; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036@Unit 037public class SuspendClient { 038 039 /** 040 * Standard java logger. 041 */ 042 private final static Logger LOG = Logger.getLogger(SuspendClient.class.getName()); 043 044 /** 045 * Job Driver configuration. 046 */ 047 private final Configuration driverConfig; 048 049 /** 050 * Reference to the REEF framework. 051 */ 052 private final REEF reef; 053 054 /** 055 * Controller that listens for suspend/resume commands on a specified port. 056 */ 057 private final SuspendClientControl controlListener; 058 059 /** 060 * @param reef reference to the REEF framework. 061 * @param port port to listen to for suspend/resume commands. 062 * @param numCycles number of cycles to run in the task. 063 * @param delay delay in seconds between cycles in the task. 064 */ 065 @Inject 066 SuspendClient( 067 final REEF reef, 068 final @Parameter(SuspendClientControl.Port.class) int port, 069 final @Parameter(Launch.NumCycles.class) int numCycles, 070 final @Parameter(Launch.Delay.class) int delay) throws BindException, IOException { 071 072 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder() 073 .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles)) 074 .bindNamedParameter(Launch.Delay.class, Integer.toString(delay)); 075 076 cb.addConfiguration(DriverConfiguration.CONF 077 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SuspendDriver.class)) 078 .set(DriverConfiguration.DRIVER_IDENTIFIER, "suspend-" + System.currentTimeMillis()) 079 .set(DriverConfiguration.ON_TASK_RUNNING, SuspendDriver.RunningTaskHandler.class) 080 .set(DriverConfiguration.ON_TASK_COMPLETED, SuspendDriver.CompletedTaskHandler.class) 081 .set(DriverConfiguration.ON_TASK_SUSPENDED, SuspendDriver.SuspendedTaskHandler.class) 082 .set(DriverConfiguration.ON_TASK_MESSAGE, SuspendDriver.TaskMessageHandler.class) 083 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SuspendDriver.AllocatedEvaluatorHandler.class) 084 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SuspendDriver.ActiveContextHandler.class) 085 .set(DriverConfiguration.ON_CLIENT_MESSAGE, SuspendDriver.ClientMessageHandler.class) 086 .set(DriverConfiguration.ON_DRIVER_STARTED, SuspendDriver.StartHandler.class) 087 .set(DriverConfiguration.ON_DRIVER_STOP, SuspendDriver.StopHandler.class) 088 .build()); 089 090 this.driverConfig = cb.build(); 091 this.reef = reef; 092 this.controlListener = new SuspendClientControl(port); 093 } 094 095 /** 096 * Start the job driver. 097 */ 098 public void submit() { 099 LOG.info("Start the job driver"); 100 this.reef.submit(this.driverConfig); 101 } 102 103 /** 104 * Wait for the job to complete. 105 */ 106 public void waitForCompletion() throws Exception { 107 LOG.info("Waiting for the Job Driver to complete."); 108 try { 109 synchronized (this) { 110 this.wait(); 111 } 112 } catch (final InterruptedException ex) { 113 LOG.log(Level.WARNING, "Waiting for result interrupted.", ex); 114 } 115 this.reef.close(); 116 this.controlListener.close(); 117 } 118 119 /** 120 * Receive notification from the driver that the job is about to run. 121 * RunningJob object is a proxy to the running job driver that can be used for sending messages. 122 */ 123 final class RunningJobHandler implements EventHandler<RunningJob> { 124 @Override 125 public void onNext(final RunningJob job) { 126 LOG.log(Level.INFO, "Running job: {0}", job.getId()); 127 SuspendClient.this.controlListener.setRunningJob(job); 128 } 129 } 130 131 /** 132 * Receive notification from the driver that the job had failed. 133 * <p/> 134 * FailedJob is a proxy for the failed job driver 135 * (contains job ID and exception thrown from the driver). 136 */ 137 final class FailedJobHandler implements EventHandler<FailedJob> { 138 @Override 139 public void onNext(final FailedJob job) { 140 LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null)); 141 synchronized (SuspendClient.this) { 142 SuspendClient.this.notify(); 143 } 144 } 145 } 146 147 /** 148 * Receive notification from the driver that the job had completed successfully. 149 */ 150 final class CompletedJobHandler implements EventHandler<CompletedJob> { 151 @Override 152 public void onNext(final CompletedJob job) { 153 LOG.log(Level.INFO, "Completed job: {0}", job.getId()); 154 synchronized (SuspendClient.this) { 155 SuspendClient.this.notify(); 156 } 157 } 158 } 159 160 /** 161 * Receive notification that there was an exception thrown from the job driver. 162 */ 163 final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { 164 @Override 165 public void onNext(final FailedRuntime error) { 166 LOG.log(Level.SEVERE, "ERROR: " + error, error.getReason().orElse(null)); 167 synchronized (SuspendClient.class) { 168 SuspendClient.this.notify(); 169 } 170 } 171 } 172}