This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.vortex.api;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.io.serialization.Codec;
024import org.apache.reef.util.Optional;
025import org.apache.reef.vortex.common.VortexFutureDelegate;
026import org.apache.reef.vortex.driver.VortexMaster;
027
028import java.util.List;
029import java.util.concurrent.*;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * The interface between user code and submitted task.
036 */
037@Unstable
038public final class VortexFuture<TOutput>
039    implements Future<TOutput>, VortexFutureDelegate {
040  private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName());
041
042  // userResult starts out as null. If not null => variable is set and tasklet returned.
043  // Otherwise tasklet has not completed.
044  private Optional<TOutput> userResult = null;
045  private Exception userException;
046  private AtomicBoolean cancelled = new AtomicBoolean(false);
047  private final CountDownLatch countDownLatch = new CountDownLatch(1);
048  private final FutureCallback<TOutput> callbackHandler;
049  private final Executor executor;
050  private final VortexMaster vortexMaster;
051  private final int taskletId;
052  private final Codec<TOutput> outputCodec;
053
054  /**
055   * Creates a {@link VortexFuture}.
056   */
057  @Private
058  public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId,
059                      final Codec<TOutput> outputCodec) {
060    this(executor, vortexMaster, taskletId, outputCodec, null);
061  }
062
063  /**
064   * Creates a {@link VortexFuture} with a callback.
065   */
066  @Private
067  public VortexFuture(final Executor executor,
068                      final VortexMaster vortexMaster,
069                      final int taskletId,
070                      final Codec<TOutput> outputCodec,
071                      final FutureCallback<TOutput> callbackHandler) {
072    this.executor = executor;
073    this.vortexMaster = vortexMaster;
074    this.taskletId = taskletId;
075    this.outputCodec = outputCodec;
076    this.callbackHandler = callbackHandler;
077  }
078
079  /**
080   * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed.
081   * @return true if task did not start or was cancelled, false if task failed or completed
082   */
083  @Override
084  public boolean cancel(final boolean mayInterruptIfRunning) {
085    try {
086      return cancel(mayInterruptIfRunning, Optional.<Long>empty(), Optional.<TimeUnit>empty());
087    } catch (final TimeoutException e) {
088      // This should never happen.
089      LOG.log(Level.WARNING, "Received a TimeoutException in VortexFuture.cancel(). Should not have occurred.");
090      return false;
091    }
092  }
093
094  /**
095   * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed, or
096   * if the timeout has expired.
097   * @return true if task did not start or was cancelled, false if task failed or completed
098   */
099  public boolean cancel(final boolean mayInterruptIfRunning, final long timeout, final TimeUnit unit)
100      throws TimeoutException {
101    return cancel(mayInterruptIfRunning, Optional.of(timeout), Optional.of(unit));
102  }
103
104  private boolean cancel(final boolean mayInterruptIfRunning,
105                         final Optional<Long> timeout,
106                         final Optional<TimeUnit> unit) throws TimeoutException {
107    if (isDone()) {
108      return isCancelled();
109    }
110
111    vortexMaster.cancelTasklet(mayInterruptIfRunning, taskletId);
112
113    try {
114      if (timeout.isPresent() && unit.isPresent()) {
115        if (!countDownLatch.await(timeout.get(), unit.get())) {
116          throw new TimeoutException();
117        }
118      } else {
119        countDownLatch.await();
120      }
121    } catch (InterruptedException e) {
122      e.printStackTrace();
123      return false;
124    }
125
126    return isCancelled();
127  }
128
129  /**
130   * @return true if the task is cancelled, false if not.
131   */
132  @Override
133  public boolean isCancelled() {
134    return cancelled.get();
135  }
136
137  /**
138   * @return true it the task completed, false if not.
139   */
140  @Override
141  public boolean isDone() {
142    return countDownLatch.getCount() == 0;
143  }
144
145  /**
146   * Infinitely wait for the result of the task.
147   * @throws InterruptedException if the thread is interrupted.
148   * @throws ExecutionException if the Tasklet execution failed to complete.
149   * @throws CancellationException if the Tasklet was cancelled.
150   */
151  @Override
152  public TOutput get() throws InterruptedException, ExecutionException, CancellationException {
153    countDownLatch.await();
154    if (userResult != null) {
155      return userResult.get();
156    } else {
157      assert this.cancelled.get() || userException != null;
158      if (userException != null) {
159        throw new ExecutionException(userException);
160      }
161
162      throw new CancellationException("Tasklet was cancelled.");
163    }
164  }
165
166  /**
167   * Wait a certain period of time for the result of the task.
168   * @throws TimeoutException if the timeout provided hits before the Tasklet is done.
169   * @throws InterruptedException if the thread is interrupted.
170   * @throws ExecutionException if the Tasklet execution failed to complete.
171   * @throws CancellationException if the Tasklet was cancelled.
172   */
173  @Override
174  public TOutput get(final long timeout, final TimeUnit unit)
175      throws InterruptedException, ExecutionException, TimeoutException, CancellationException {
176    if (!countDownLatch.await(timeout, unit)) {
177      throw new TimeoutException();
178    }
179
180    return get();
181  }
182
183  /**
184   * Called by VortexMaster to let the user know that the Tasklet completed.
185   */
186  @Private
187  @Override
188  public void completed(final int pTaskletId, final byte[] serializedResult) {
189    assert taskletId == pTaskletId;
190
191    // TODO[REEF-1113]: Handle serialization failure separately in Vortex
192    final TOutput result = outputCodec.decode(serializedResult);
193    this.userResult = Optional.ofNullable(result);
194    if (callbackHandler != null) {
195      executor.execute(new Runnable() {
196        @Override
197        public void run() {
198          callbackHandler.onSuccess(userResult.get());
199        }
200      });
201    }
202    this.countDownLatch.countDown();
203  }
204
205  /**
206   * VortexMaster should never call this.
207   */
208  @Private
209  @Override
210  public void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult) {
211    throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated.");
212  }
213
214  /**
215   * Called by VortexMaster to let the user know that the Tasklet threw an exception.
216   */
217  @Private
218  @Override
219  public void threwException(final int pTaskletId, final Exception exception) {
220    assert taskletId == pTaskletId;
221
222    this.userException = exception;
223    if (callbackHandler != null) {
224      executor.execute(new Runnable() {
225        @Override
226        public void run() {
227          callbackHandler.onFailure(exception);
228        }
229      });
230    }
231    this.countDownLatch.countDown();
232  }
233
234  /**
235   * VortexMaster should never call this.
236   */
237  @Private
238  @Override
239  public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) {
240    throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated");
241  }
242
243  /**
244   * Called by VortexMaster to let the user know that the Tasklet was cancelled.
245   */
246  @Private
247  @Override
248  public void cancelled(final int pTaskletId) {
249    assert taskletId == pTaskletId;
250
251    this.cancelled.set(true);
252    if (callbackHandler != null) {
253      executor.execute(new Runnable() {
254        @Override
255        public void run() {
256          callbackHandler.onFailure(new InterruptedException("VortexFuture has been cancelled on request."));
257        }
258      });
259    }
260    this.countDownLatch.countDown();
261  }
262}