001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.vortex.api; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.util.Optional; 024import org.apache.reef.vortex.driver.VortexFutureDelegate; 025import org.apache.reef.vortex.driver.VortexMaster; 026 027import java.util.List; 028import java.util.concurrent.*; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.logging.Level; 031import java.util.logging.Logger; 032 033/** 034 * The interface between user code and submitted task. 035 */ 036@Unstable 037public final class VortexFuture<TOutput> implements Future<TOutput>, VortexFutureDelegate<TOutput> { 038 private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName()); 039 040 // userResult starts out as null. If not null => variable is set and tasklet returned. 041 // Otherwise tasklet has not completed. 042 private Optional<TOutput> userResult = null; 043 private Exception userException; 044 private AtomicBoolean cancelled = new AtomicBoolean(false); 045 private final CountDownLatch countDownLatch = new CountDownLatch(1); 046 private final FutureCallback<TOutput> callbackHandler; 047 private final Executor executor; 048 private final VortexMaster vortexMaster; 049 private final int taskletId; 050 051 /** 052 * Creates a {@link VortexFuture}. 053 */ 054 @Private 055 public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId) { 056 this(executor, vortexMaster, taskletId, null); 057 } 058 059 /** 060 * Creates a {@link VortexFuture} with a callback. 061 */ 062 @Private 063 public VortexFuture(final Executor executor, 064 final VortexMaster vortexMaster, 065 final int taskletId, 066 final FutureCallback<TOutput> callbackHandler) { 067 this.executor = executor; 068 this.vortexMaster = vortexMaster; 069 this.taskletId = taskletId; 070 this.callbackHandler = callbackHandler; 071 } 072 073 /** 074 * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed. 075 * @return true if task did not start or was cancelled, false if task failed or completed 076 */ 077 @Override 078 public boolean cancel(final boolean mayInterruptIfRunning) { 079 try { 080 return cancel(mayInterruptIfRunning, Optional.<Long>empty(), Optional.<TimeUnit>empty()); 081 } catch (final TimeoutException e) { 082 // This should never happen. 083 LOG.log(Level.WARNING, "Received a TimeoutException in VortexFuture.cancel(). Should not have occurred."); 084 return false; 085 } 086 } 087 088 /** 089 * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed, or 090 * if the timeout has expired. 091 * @return true if task did not start or was cancelled, false if task failed or completed 092 */ 093 public boolean cancel(final boolean mayInterruptIfRunning, final long timeout, final TimeUnit unit) 094 throws TimeoutException { 095 return cancel(mayInterruptIfRunning, Optional.of(timeout), Optional.of(unit)); 096 } 097 098 private boolean cancel(final boolean mayInterruptIfRunning, 099 final Optional<Long> timeout, 100 final Optional<TimeUnit> unit) throws TimeoutException { 101 if (isDone()) { 102 return isCancelled(); 103 } 104 105 vortexMaster.cancelTasklet(mayInterruptIfRunning, taskletId); 106 107 try { 108 if (timeout.isPresent() && unit.isPresent()) { 109 if (!countDownLatch.await(timeout.get(), unit.get())) { 110 throw new TimeoutException("Cancellation of the VortexFuture timed out. Timeout = " + timeout.get() 111 + " in time units: " + unit.get()); 112 } 113 } else { 114 countDownLatch.await(); 115 } 116 } catch (InterruptedException e) { 117 e.printStackTrace(); 118 return false; 119 } 120 121 return isCancelled(); 122 } 123 124 /** 125 * @return true if the task is cancelled, false if not. 126 */ 127 @Override 128 public boolean isCancelled() { 129 return cancelled.get(); 130 } 131 132 /** 133 * @return true it the task completed, false if not. 134 */ 135 @Override 136 public boolean isDone() { 137 return countDownLatch.getCount() == 0; 138 } 139 140 /** 141 * Infinitely wait for the result of the task. 142 * @throws InterruptedException if the thread is interrupted. 143 * @throws ExecutionException if the Tasklet execution failed to complete. 144 * @throws CancellationException if the Tasklet was cancelled. 145 */ 146 @Override 147 public TOutput get() throws InterruptedException, ExecutionException, CancellationException { 148 countDownLatch.await(); 149 if (userResult != null) { 150 return userResult.get(); 151 } else { 152 assert this.cancelled.get() || userException != null; 153 if (userException != null) { 154 throw new ExecutionException(userException); 155 } 156 157 throw new CancellationException("Tasklet was cancelled."); 158 } 159 } 160 161 /** 162 * Wait a certain period of time for the result of the task. 163 * @throws TimeoutException if the timeout provided hits before the Tasklet is done. 164 * @throws InterruptedException if the thread is interrupted. 165 * @throws ExecutionException if the Tasklet execution failed to complete. 166 * @throws CancellationException if the Tasklet was cancelled. 167 */ 168 @Override 169 public TOutput get(final long timeout, final TimeUnit unit) 170 throws InterruptedException, ExecutionException, TimeoutException, CancellationException { 171 if (!countDownLatch.await(timeout, unit)) { 172 throw new TimeoutException("Waiting for the results of the task timed out. Timeout = " + timeout 173 + " in time units: " + unit); 174 } 175 176 return get(); 177 } 178 179 /** 180 * Called by VortexMaster to let the user know that the Tasklet completed. 181 */ 182 @Private 183 @Override 184 public void completed(final int pTaskletId, final TOutput result) { 185 assert taskletId == pTaskletId; 186 this.userResult = Optional.ofNullable(result); 187 if (callbackHandler != null) { 188 executor.execute(new Runnable() { 189 @Override 190 public void run() { 191 callbackHandler.onSuccess(userResult.get()); 192 } 193 }); 194 } 195 this.countDownLatch.countDown(); 196 } 197 198 /** 199 * VortexMaster should never call this. 200 */ 201 @Private 202 @Override 203 public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) { 204 throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated."); 205 } 206 207 /** 208 * Called by VortexMaster to let the user know that the Tasklet threw an exception. 209 */ 210 @Private 211 @Override 212 public void threwException(final int pTaskletId, final Exception exception) { 213 assert taskletId == pTaskletId; 214 215 this.userException = exception; 216 if (callbackHandler != null) { 217 executor.execute(new Runnable() { 218 @Override 219 public void run() { 220 callbackHandler.onFailure(exception); 221 } 222 }); 223 } 224 this.countDownLatch.countDown(); 225 } 226 227 /** 228 * VortexMaster should never call this. 229 */ 230 @Private 231 @Override 232 public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) { 233 throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated"); 234 } 235 236 /** 237 * Called by VortexMaster to let the user know that the Tasklet was cancelled. 238 */ 239 @Private 240 @Override 241 public void cancelled(final int pTaskletId) { 242 assert taskletId == pTaskletId; 243 244 this.cancelled.set(true); 245 if (callbackHandler != null) { 246 executor.execute(new Runnable() { 247 @Override 248 public void run() { 249 callbackHandler.onFailure(new InterruptedException("VortexFuture has been cancelled on request.")); 250 } 251 }); 252 } 253 this.countDownLatch.countDown(); 254 } 255}