This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.vortex.api;
020
021import org.apache.commons.lang3.NotImplementedException;
022import org.apache.commons.lang3.tuple.ImmutablePair;
023import org.apache.commons.lang3.tuple.Pair;
024import org.apache.reef.annotations.Unstable;
025import org.apache.reef.annotations.audience.ClientSide;
026import org.apache.reef.annotations.audience.Private;
027import org.apache.reef.annotations.audience.Public;
028import org.apache.reef.vortex.driver.VortexFutureDelegate;
029
030import javax.annotation.concurrent.NotThreadSafe;
031import java.util.*;
032import java.util.concurrent.*;
033
034/**
035 * The interface between user code and aggregation Tasklets.
036 * Thread safety: This class is not meant to be used in a multi-threaded fashion.
037 * TODO[JIRA REEF-1131]: Create and run tests once functional.
038 */
039@Public
040@ClientSide
041@NotThreadSafe
042@Unstable
043public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutureDelegate<TOutput> {
044  private final Executor executor;
045  private final BlockingQueue<Pair<List<Integer>, AggregateResult>> resultQueue;
046  private final ConcurrentMap<Integer, TInput> taskletIdInputMap;
047  private final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler;
048
049  @Private
050  public VortexAggregateFuture(final Executor executor,
051                               final Map<Integer, TInput> taskletIdInputMap,
052                               final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler) {
053    this.executor = executor;
054    this.taskletIdInputMap = new ConcurrentHashMap<>(taskletIdInputMap);
055    this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size());
056    this.callbackHandler = callbackHandler;
057  }
058
059  /**
060   * @return the next aggregation result for the future, null if no more results.
061   */
062  public synchronized AggregateResultSynchronous<TInput, TOutput> get() throws InterruptedException {
063    if (taskletIdInputMap.isEmpty()) {
064      return null;
065    }
066
067    final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.take();
068
069    removeFromTaskletIdInputMap(resultPair.getLeft());
070    return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty());
071  }
072
073  /**
074   * @param timeout the timeout for the operation.
075   * @param timeUnit the time unit of the timeout.
076   * @return the next aggregation result for the future, within the user specified timeout, null if no more results.
077   * @throws TimeoutException if time out hits.
078   */
079  public synchronized AggregateResultSynchronous<TInput, TOutput> get(final long timeout, final TimeUnit timeUnit)
080      throws InterruptedException, TimeoutException {
081    if (taskletIdInputMap.isEmpty()) {
082      return null;
083    }
084
085    final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.poll(timeout, timeUnit);
086
087    if (resultPair == null) {
088      throw new TimeoutException("Synchronous aggregation of the next future result timed out. Timeout = " + timeout
089              + " in time units: " + timeUnit);
090    }
091
092    removeFromTaskletIdInputMap(resultPair.getLeft());
093    return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty());
094  }
095
096  private void removeFromTaskletIdInputMap(final List<Integer> taskletIds) {
097    for (final int taskletId : taskletIds) {
098      taskletIdInputMap.remove(taskletId);
099    }
100  }
101
102  /**
103   * @return true if there are no more results to poll.
104   */
105  public boolean isDone() {
106    return taskletIdInputMap.isEmpty();
107  }
108
109  /**
110   * A Tasklet associated with the aggregation has completed.
111   */
112  @Private
113  @Override
114  public void completed(final int taskletId, final TOutput result) {
115    try {
116      completedTasklets(result, Collections.singletonList(taskletId));
117    } catch (final InterruptedException e) {
118      throw new RuntimeException(e);
119    }
120  }
121
122  /**
123   * Aggregation has completed for a list of Tasklets, with an aggregated result.
124   */
125  @Private
126  @Override
127  public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) {
128    try {
129      completedTasklets(result, taskletIds);
130    } catch (final InterruptedException e) {
131      throw new RuntimeException(e);
132    }
133  }
134
135  /**
136   * A Tasklet associated with the aggregation has failed.
137   */
138  @Private
139  @Override
140  public void threwException(final int taskletId, final Exception exception) {
141    try {
142      failedTasklets(exception, Collections.singletonList(taskletId));
143    } catch (final InterruptedException e) {
144      throw new RuntimeException(e);
145    }
146  }
147
148  /**
149   * A list of Tasklets has failed during aggregation phase.
150   */
151  @Private
152  @Override
153  public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) {
154    try {
155      failedTasklets(exception, taskletIds);
156    } catch (final InterruptedException e) {
157      throw new RuntimeException(e);
158    }
159  }
160
161  /**
162   * Not implemented for local aggregation.
163   */
164  @Private
165  @Override
166  public void cancelled(final int taskletId) {
167    throw new NotImplementedException("Tasklet cancellation not supported in aggregations.");
168  }
169
170  /**
171   * Create and queue result for Tasklets that are expected and invoke callback.
172   */
173  private void completedTasklets(final TOutput output, final List<Integer> taskletIds)
174      throws InterruptedException {
175    final List<TInput> inputs = getInputs(taskletIds);
176    final AggregateResult result = new AggregateResult(output, inputs);
177
178    if (callbackHandler != null) {
179      executor.execute(new Runnable() {
180        @Override
181        public void run() {
182          callbackHandler.onSuccess(result);
183        }
184      });
185    }
186
187    resultQueue.put(new ImmutablePair<>(taskletIds, result));
188  }
189
190  /**
191   * Create and queue result for failed Tasklets that are expected and invokes callback.
192   */
193  private void failedTasklets(final Exception exception, final List<Integer> taskletIds)
194      throws InterruptedException {
195
196    final List<TInput> inputs = getInputs(taskletIds);
197    final AggregateResult failure = new AggregateResult(exception, inputs);
198
199    if (callbackHandler != null) {
200      executor.execute(new Runnable() {
201        @Override
202        public void run() {
203          callbackHandler.onFailure(new VortexAggregateException(exception, inputs));
204        }
205      });
206    }
207
208    resultQueue.put(new ImmutablePair<>(taskletIds, failure));
209  }
210
211  /**
212   * Gets the inputs on Tasklet aggregation completion.
213   */
214  private List<TInput> getInputs(final List<Integer> taskletIds) {
215
216    final List<TInput> inputList = new ArrayList<>(taskletIds.size());
217
218    for(final int taskletId : taskletIds) {
219      inputList.add(taskletIdInputMap.get(taskletId));
220    }
221
222    return inputList;
223  }
224}