This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.vortex.api;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.util.Optional;
024import org.apache.reef.vortex.driver.VortexFutureDelegate;
025import org.apache.reef.vortex.driver.VortexMaster;
026
027import java.util.List;
028import java.util.concurrent.*;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.logging.Level;
031import java.util.logging.Logger;
032
033/**
034 * The interface between user code and submitted task.
035 */
036@Unstable
037public final class VortexFuture<TOutput> implements Future<TOutput>, VortexFutureDelegate<TOutput> {
038  private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName());
039
040  // userResult starts out as null. If not null => variable is set and tasklet returned.
041  // Otherwise tasklet has not completed.
042  private Optional<TOutput> userResult = null;
043  private Exception userException;
044  private AtomicBoolean cancelled = new AtomicBoolean(false);
045  private final CountDownLatch countDownLatch = new CountDownLatch(1);
046  private final FutureCallback<TOutput> callbackHandler;
047  private final Executor executor;
048  private final VortexMaster vortexMaster;
049  private final int taskletId;
050
051  /**
052   * Creates a {@link VortexFuture}.
053   */
054  @Private
055  public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId) {
056    this(executor, vortexMaster, taskletId, null);
057  }
058
059  /**
060   * Creates a {@link VortexFuture} with a callback.
061   */
062  @Private
063  public VortexFuture(final Executor executor,
064                      final VortexMaster vortexMaster,
065                      final int taskletId,
066                      final FutureCallback<TOutput> callbackHandler) {
067    this.executor = executor;
068    this.vortexMaster = vortexMaster;
069    this.taskletId = taskletId;
070    this.callbackHandler = callbackHandler;
071  }
072
073  /**
074   * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed.
075   * @return true if task did not start or was cancelled, false if task failed or completed
076   */
077  @Override
078  public boolean cancel(final boolean mayInterruptIfRunning) {
079    try {
080      return cancel(mayInterruptIfRunning, Optional.<Long>empty(), Optional.<TimeUnit>empty());
081    } catch (final TimeoutException e) {
082      // This should never happen.
083      LOG.log(Level.WARNING, "Received a TimeoutException in VortexFuture.cancel(). Should not have occurred.");
084      return false;
085    }
086  }
087
088  /**
089   * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed, or
090   * if the timeout has expired.
091   * @return true if task did not start or was cancelled, false if task failed or completed
092   */
093  public boolean cancel(final boolean mayInterruptIfRunning, final long timeout, final TimeUnit unit)
094      throws TimeoutException {
095    return cancel(mayInterruptIfRunning, Optional.of(timeout), Optional.of(unit));
096  }
097
098  private boolean cancel(final boolean mayInterruptIfRunning,
099                         final Optional<Long> timeout,
100                         final Optional<TimeUnit> unit) throws TimeoutException {
101    if (isDone()) {
102      return isCancelled();
103    }
104
105    vortexMaster.cancelTasklet(mayInterruptIfRunning, taskletId);
106
107    try {
108      if (timeout.isPresent() && unit.isPresent()) {
109        if (!countDownLatch.await(timeout.get(), unit.get())) {
110          throw new TimeoutException("Cancellation of the VortexFuture timed out. Timeout = " + timeout.get()
111                  + " in time units: " + unit.get());
112        }
113      } else {
114        countDownLatch.await();
115      }
116    } catch (InterruptedException e) {
117      e.printStackTrace();
118      return false;
119    }
120
121    return isCancelled();
122  }
123
124  /**
125   * @return true if the task is cancelled, false if not.
126   */
127  @Override
128  public boolean isCancelled() {
129    return cancelled.get();
130  }
131
132  /**
133   * @return true it the task completed, false if not.
134   */
135  @Override
136  public boolean isDone() {
137    return countDownLatch.getCount() == 0;
138  }
139
140  /**
141   * Infinitely wait for the result of the task.
142   * @throws InterruptedException if the thread is interrupted.
143   * @throws ExecutionException if the Tasklet execution failed to complete.
144   * @throws CancellationException if the Tasklet was cancelled.
145   */
146  @Override
147  public TOutput get() throws InterruptedException, ExecutionException, CancellationException {
148    countDownLatch.await();
149    if (userResult != null) {
150      return userResult.get();
151    } else {
152      assert this.cancelled.get() || userException != null;
153      if (userException != null) {
154        throw new ExecutionException(userException);
155      }
156
157      throw new CancellationException("Tasklet was cancelled.");
158    }
159  }
160
161  /**
162   * Wait a certain period of time for the result of the task.
163   * @throws TimeoutException if the timeout provided hits before the Tasklet is done.
164   * @throws InterruptedException if the thread is interrupted.
165   * @throws ExecutionException if the Tasklet execution failed to complete.
166   * @throws CancellationException if the Tasklet was cancelled.
167   */
168  @Override
169  public TOutput get(final long timeout, final TimeUnit unit)
170      throws InterruptedException, ExecutionException, TimeoutException, CancellationException {
171    if (!countDownLatch.await(timeout, unit)) {
172      throw new TimeoutException("Waiting for the results of the task timed out. Timeout = " + timeout
173              + " in time units: " + unit);
174    }
175
176    return get();
177  }
178
179  /**
180   * Called by VortexMaster to let the user know that the Tasklet completed.
181   */
182  @Private
183  @Override
184  public void completed(final int pTaskletId, final TOutput result) {
185    assert taskletId == pTaskletId;
186    this.userResult = Optional.ofNullable(result);
187    if (callbackHandler != null) {
188      executor.execute(new Runnable() {
189        @Override
190        public void run() {
191          callbackHandler.onSuccess(userResult.get());
192        }
193      });
194    }
195    this.countDownLatch.countDown();
196  }
197
198  /**
199   * VortexMaster should never call this.
200   */
201  @Private
202  @Override
203  public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) {
204    throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated.");
205  }
206
207  /**
208   * Called by VortexMaster to let the user know that the Tasklet threw an exception.
209   */
210  @Private
211  @Override
212  public void threwException(final int pTaskletId, final Exception exception) {
213    assert taskletId == pTaskletId;
214
215    this.userException = exception;
216    if (callbackHandler != null) {
217      executor.execute(new Runnable() {
218        @Override
219        public void run() {
220          callbackHandler.onFailure(exception);
221        }
222      });
223    }
224    this.countDownLatch.countDown();
225  }
226
227  /**
228   * VortexMaster should never call this.
229   */
230  @Private
231  @Override
232  public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) {
233    throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated");
234  }
235
236  /**
237   * Called by VortexMaster to let the user know that the Tasklet was cancelled.
238   */
239  @Private
240  @Override
241  public void cancelled(final int pTaskletId) {
242    assert taskletId == pTaskletId;
243
244    this.cancelled.set(true);
245    if (callbackHandler != null) {
246      executor.execute(new Runnable() {
247        @Override
248        public void run() {
249          callbackHandler.onFailure(new InterruptedException("VortexFuture has been cancelled on request."));
250        }
251      });
252    }
253    this.countDownLatch.countDown();
254  }
255}