001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.suspend; 020 021import org.apache.reef.client.*; 022import org.apache.reef.tang.Configuration; 023import org.apache.reef.tang.JavaConfigurationBuilder; 024import org.apache.reef.tang.Tang; 025import org.apache.reef.tang.annotations.Parameter; 026import org.apache.reef.tang.annotations.Unit; 027import org.apache.reef.tang.exceptions.BindException; 028import org.apache.reef.util.EnvironmentUtils; 029import org.apache.reef.wake.EventHandler; 030 031import javax.inject.Inject; 032import java.io.IOException; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036/** 037 * Client for suspend example. 038 */ 039@Unit 040public class SuspendClient { 041 042 /** 043 * Standard java logger. 044 */ 045 private static final Logger LOG = Logger.getLogger(SuspendClient.class.getName()); 046 047 /** 048 * Job Driver configuration. 049 */ 050 private final Configuration driverConfig; 051 052 /** 053 * Reference to the REEF framework. 054 */ 055 private final REEF reef; 056 057 /** 058 * Controller that listens for suspend/resume commands on a specified port. 059 */ 060 private final SuspendClientControl controlListener; 061 062 /** 063 * @param reef reference to the REEF framework. 064 * @param controlListener suspend client control listener. 065 * @param numCycles number of cycles to run in the task. 066 * @param delay delay in seconds between cycles in the task. 067 */ 068 @Inject 069 SuspendClient( 070 final REEF reef, 071 final SuspendClientControl controlListener, 072 @Parameter(Launch.NumCycles.class) final int numCycles, 073 @Parameter(Launch.Delay.class) final int delay) throws BindException, IOException { 074 075 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder() 076 .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles)) 077 .bindNamedParameter(Launch.Delay.class, Integer.toString(delay)); 078 079 cb.addConfiguration(DriverConfiguration.CONF 080 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SuspendDriver.class)) 081 .set(DriverConfiguration.DRIVER_IDENTIFIER, "suspend-" + System.currentTimeMillis()) 082 .set(DriverConfiguration.ON_TASK_RUNNING, SuspendDriver.RunningTaskHandler.class) 083 .set(DriverConfiguration.ON_TASK_COMPLETED, SuspendDriver.CompletedTaskHandler.class) 084 .set(DriverConfiguration.ON_TASK_SUSPENDED, SuspendDriver.SuspendedTaskHandler.class) 085 .set(DriverConfiguration.ON_TASK_MESSAGE, SuspendDriver.TaskMessageHandler.class) 086 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SuspendDriver.AllocatedEvaluatorHandler.class) 087 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SuspendDriver.ActiveContextHandler.class) 088 .set(DriverConfiguration.ON_CLIENT_MESSAGE, SuspendDriver.ClientMessageHandler.class) 089 .set(DriverConfiguration.ON_DRIVER_STARTED, SuspendDriver.StartHandler.class) 090 .set(DriverConfiguration.ON_DRIVER_STOP, SuspendDriver.StopHandler.class) 091 .build()); 092 093 this.driverConfig = cb.build(); 094 this.reef = reef; 095 this.controlListener = controlListener; 096 } 097 098 /** 099 * Start the job driver. 100 */ 101 public void submit() { 102 LOG.info("Start the job driver"); 103 this.reef.submit(this.driverConfig); 104 } 105 106 /** 107 * Wait for the job to complete. 108 */ 109 public void waitForCompletion() throws Exception { 110 LOG.info("Waiting for the Job Driver to complete."); 111 try { 112 synchronized (this) { 113 this.wait(); 114 } 115 } catch (final InterruptedException ex) { 116 LOG.log(Level.WARNING, "Waiting for result interrupted.", ex); 117 } 118 this.reef.close(); 119 this.controlListener.close(); 120 } 121 122 /** 123 * Receive notification from the driver that the job is about to run. 124 * RunningJob object is a proxy to the running job driver that can be used for sending messages. 125 */ 126 final class RunningJobHandler implements EventHandler<RunningJob> { 127 @Override 128 public void onNext(final RunningJob job) { 129 LOG.log(Level.INFO, "Running job: {0}", job.getId()); 130 SuspendClient.this.controlListener.setRunningJob(job); 131 } 132 } 133 134 /** 135 * Receive notification from the driver that the job had failed. 136 * <p> 137 * FailedJob is a proxy for the failed job driver 138 * (contains job ID and exception thrown from the driver). 139 */ 140 final class FailedJobHandler implements EventHandler<FailedJob> { 141 @Override 142 public void onNext(final FailedJob job) { 143 LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null)); 144 synchronized (SuspendClient.this) { 145 SuspendClient.this.notify(); 146 } 147 } 148 } 149 150 /** 151 * Receive notification from the driver that the job had completed successfully. 152 */ 153 final class CompletedJobHandler implements EventHandler<CompletedJob> { 154 @Override 155 public void onNext(final CompletedJob job) { 156 LOG.log(Level.INFO, "Completed job: {0}", job.getId()); 157 synchronized (SuspendClient.this) { 158 SuspendClient.this.notify(); 159 } 160 } 161 } 162 163 /** 164 * Receive notification that there was an exception thrown from the job driver. 165 */ 166 final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { 167 @Override 168 public void onNext(final FailedRuntime error) { 169 LOG.log(Level.SEVERE, "ERROR: " + error, error.getReason().orElse(null)); 170 synchronized (SuspendClient.this) { 171 SuspendClient.this.notify(); 172 } 173 } 174 } 175}