001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.vortex.api; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.io.serialization.Codec; 024import org.apache.reef.util.Optional; 025import org.apache.reef.vortex.common.VortexFutureDelegate; 026import org.apache.reef.vortex.driver.VortexMaster; 027 028import java.util.List; 029import java.util.concurrent.*; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * The interface between user code and submitted task. 036 */ 037@Unstable 038public final class VortexFuture<TOutput> 039 implements Future<TOutput>, VortexFutureDelegate { 040 private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName()); 041 042 // userResult starts out as null. If not null => variable is set and tasklet returned. 043 // Otherwise tasklet has not completed. 044 private Optional<TOutput> userResult = null; 045 private Exception userException; 046 private AtomicBoolean cancelled = new AtomicBoolean(false); 047 private final CountDownLatch countDownLatch = new CountDownLatch(1); 048 private final FutureCallback<TOutput> callbackHandler; 049 private final Executor executor; 050 private final VortexMaster vortexMaster; 051 private final int taskletId; 052 private final Codec<TOutput> outputCodec; 053 054 /** 055 * Creates a {@link VortexFuture}. 056 */ 057 @Private 058 public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId, 059 final Codec<TOutput> outputCodec) { 060 this(executor, vortexMaster, taskletId, outputCodec, null); 061 } 062 063 /** 064 * Creates a {@link VortexFuture} with a callback. 065 */ 066 @Private 067 public VortexFuture(final Executor executor, 068 final VortexMaster vortexMaster, 069 final int taskletId, 070 final Codec<TOutput> outputCodec, 071 final FutureCallback<TOutput> callbackHandler) { 072 this.executor = executor; 073 this.vortexMaster = vortexMaster; 074 this.taskletId = taskletId; 075 this.outputCodec = outputCodec; 076 this.callbackHandler = callbackHandler; 077 } 078 079 /** 080 * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed. 081 * @return true if task did not start or was cancelled, false if task failed or completed 082 */ 083 @Override 084 public boolean cancel(final boolean mayInterruptIfRunning) { 085 try { 086 return cancel(mayInterruptIfRunning, Optional.<Long>empty(), Optional.<TimeUnit>empty()); 087 } catch (final TimeoutException e) { 088 // This should never happen. 089 LOG.log(Level.WARNING, "Received a TimeoutException in VortexFuture.cancel(). Should not have occurred."); 090 return false; 091 } 092 } 093 094 /** 095 * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed, or 096 * if the timeout has expired. 097 * @return true if task did not start or was cancelled, false if task failed or completed 098 */ 099 public boolean cancel(final boolean mayInterruptIfRunning, final long timeout, final TimeUnit unit) 100 throws TimeoutException { 101 return cancel(mayInterruptIfRunning, Optional.of(timeout), Optional.of(unit)); 102 } 103 104 private boolean cancel(final boolean mayInterruptIfRunning, 105 final Optional<Long> timeout, 106 final Optional<TimeUnit> unit) throws TimeoutException { 107 if (isDone()) { 108 return isCancelled(); 109 } 110 111 vortexMaster.cancelTasklet(mayInterruptIfRunning, taskletId); 112 113 try { 114 if (timeout.isPresent() && unit.isPresent()) { 115 if (!countDownLatch.await(timeout.get(), unit.get())) { 116 throw new TimeoutException(); 117 } 118 } else { 119 countDownLatch.await(); 120 } 121 } catch (InterruptedException e) { 122 e.printStackTrace(); 123 return false; 124 } 125 126 return isCancelled(); 127 } 128 129 /** 130 * @return true if the task is cancelled, false if not. 131 */ 132 @Override 133 public boolean isCancelled() { 134 return cancelled.get(); 135 } 136 137 /** 138 * @return true it the task completed, false if not. 139 */ 140 @Override 141 public boolean isDone() { 142 return countDownLatch.getCount() == 0; 143 } 144 145 /** 146 * Infinitely wait for the result of the task. 147 * @throws InterruptedException if the thread is interrupted. 148 * @throws ExecutionException if the Tasklet execution failed to complete. 149 * @throws CancellationException if the Tasklet was cancelled. 150 */ 151 @Override 152 public TOutput get() throws InterruptedException, ExecutionException, CancellationException { 153 countDownLatch.await(); 154 if (userResult != null) { 155 return userResult.get(); 156 } else { 157 assert this.cancelled.get() || userException != null; 158 if (userException != null) { 159 throw new ExecutionException(userException); 160 } 161 162 throw new CancellationException("Tasklet was cancelled."); 163 } 164 } 165 166 /** 167 * Wait a certain period of time for the result of the task. 168 * @throws TimeoutException if the timeout provided hits before the Tasklet is done. 169 * @throws InterruptedException if the thread is interrupted. 170 * @throws ExecutionException if the Tasklet execution failed to complete. 171 * @throws CancellationException if the Tasklet was cancelled. 172 */ 173 @Override 174 public TOutput get(final long timeout, final TimeUnit unit) 175 throws InterruptedException, ExecutionException, TimeoutException, CancellationException { 176 if (!countDownLatch.await(timeout, unit)) { 177 throw new TimeoutException(); 178 } 179 180 return get(); 181 } 182 183 /** 184 * Called by VortexMaster to let the user know that the Tasklet completed. 185 */ 186 @Private 187 @Override 188 public void completed(final int pTaskletId, final byte[] serializedResult) { 189 assert taskletId == pTaskletId; 190 191 // TODO[REEF-1113]: Handle serialization failure separately in Vortex 192 final TOutput result = outputCodec.decode(serializedResult); 193 this.userResult = Optional.ofNullable(result); 194 if (callbackHandler != null) { 195 executor.execute(new Runnable() { 196 @Override 197 public void run() { 198 callbackHandler.onSuccess(userResult.get()); 199 } 200 }); 201 } 202 this.countDownLatch.countDown(); 203 } 204 205 /** 206 * VortexMaster should never call this. 207 */ 208 @Private 209 @Override 210 public void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult) { 211 throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated."); 212 } 213 214 /** 215 * Called by VortexMaster to let the user know that the Tasklet threw an exception. 216 */ 217 @Private 218 @Override 219 public void threwException(final int pTaskletId, final Exception exception) { 220 assert taskletId == pTaskletId; 221 222 this.userException = exception; 223 if (callbackHandler != null) { 224 executor.execute(new Runnable() { 225 @Override 226 public void run() { 227 callbackHandler.onFailure(exception); 228 } 229 }); 230 } 231 this.countDownLatch.countDown(); 232 } 233 234 /** 235 * VortexMaster should never call this. 236 */ 237 @Private 238 @Override 239 public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) { 240 throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated"); 241 } 242 243 /** 244 * Called by VortexMaster to let the user know that the Tasklet was cancelled. 245 */ 246 @Private 247 @Override 248 public void cancelled(final int pTaskletId) { 249 assert taskletId == pTaskletId; 250 251 this.cancelled.set(true); 252 if (callbackHandler != null) { 253 executor.execute(new Runnable() { 254 @Override 255 public void run() { 256 callbackHandler.onFailure(new InterruptedException("VortexFuture has been cancelled on request.")); 257 } 258 }); 259 } 260 this.countDownLatch.countDown(); 261 } 262}