This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.vortex.api;
020
021import org.apache.commons.lang3.NotImplementedException;
022import org.apache.commons.lang3.tuple.ImmutablePair;
023import org.apache.commons.lang3.tuple.Pair;
024import org.apache.reef.annotations.Unstable;
025import org.apache.reef.annotations.audience.ClientSide;
026import org.apache.reef.annotations.audience.Private;
027import org.apache.reef.annotations.audience.Public;
028import org.apache.reef.io.serialization.Codec;
029import org.apache.reef.vortex.common.VortexFutureDelegate;
030
031import javax.annotation.concurrent.NotThreadSafe;
032import java.util.*;
033import java.util.concurrent.*;
034
035/**
036 * The interface between user code and aggregation Tasklets.
037 * Thread safety: This class is not meant to be used in a multi-threaded fashion.
038 * TODO[JIRA REEF-1131]: Create and run tests once functional.
039 */
040@Public
041@ClientSide
042@NotThreadSafe
043@Unstable
044public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutureDelegate {
045  private final Executor executor;
046  private final Codec<TOutput> aggOutputCodec;
047  private final BlockingQueue<Pair<List<Integer>, AggregateResult>> resultQueue;
048  private final ConcurrentMap<Integer, TInput> taskletIdInputMap;
049  private final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler;
050
051  @Private
052  public VortexAggregateFuture(final Executor executor,
053                               final Map<Integer, TInput> taskletIdInputMap,
054                               final Codec<TOutput> aggOutputCodec,
055                               final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler) {
056    this.executor = executor;
057    this.taskletIdInputMap = new ConcurrentHashMap<>(taskletIdInputMap);
058    this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size());
059    this.aggOutputCodec = aggOutputCodec;
060    this.callbackHandler = callbackHandler;
061  }
062
063  /**
064   * @return the next aggregation result for the future, null if no more results.
065   */
066  public synchronized AggregateResultSynchronous<TInput, TOutput> get() throws InterruptedException {
067    if (taskletIdInputMap.isEmpty()) {
068      return null;
069    }
070
071    final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.take();
072
073    removeFromTaskletIdInputMap(resultPair.getLeft());
074    return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty());
075  }
076
077  /**
078   * @param timeout the timeout for the operation.
079   * @param timeUnit the time unit of the timeout.
080   * @return the next aggregation result for the future, within the user specified timeout, null if no more results.
081   * @throws TimeoutException if time out hits.
082   */
083  public synchronized AggregateResultSynchronous<TInput, TOutput> get(final long timeout, final TimeUnit timeUnit)
084      throws InterruptedException, TimeoutException {
085    if (taskletIdInputMap.isEmpty()) {
086      return null;
087    }
088
089    final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.poll(timeout, timeUnit);
090
091    if (resultPair == null) {
092      throw new TimeoutException();
093    }
094
095    removeFromTaskletIdInputMap(resultPair.getLeft());
096    return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty());
097  }
098
099  private void removeFromTaskletIdInputMap(final List<Integer> taskletIds) {
100    for (final int taskletId : taskletIds) {
101      taskletIdInputMap.remove(taskletId);
102    }
103  }
104
105  /**
106   * @return true if there are no more results to poll.
107   */
108  public boolean isDone() {
109    return taskletIdInputMap.isEmpty();
110  }
111
112  /**
113   * A Tasklet associated with the aggregation has completed.
114   */
115  @Private
116  @Override
117  public void completed(final int taskletId, final byte[] serializedResult) {
118    try {
119      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
120      final TOutput result = aggOutputCodec.decode(serializedResult);
121      completedTasklets(result, Collections.singletonList(taskletId));
122    } catch (final InterruptedException e) {
123      throw new RuntimeException(e);
124    }
125  }
126
127  /**
128   * Aggregation has completed for a list of Tasklets, with an aggregated result.
129   */
130  @Private
131  @Override
132  public void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult) {
133    try {
134      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
135      final TOutput result = aggOutputCodec.decode(serializedResult);
136      completedTasklets(result, taskletIds);
137    } catch (final InterruptedException e) {
138      throw new RuntimeException(e);
139    }
140  }
141
142  /**
143   * A Tasklet associated with the aggregation has failed.
144   */
145  @Private
146  @Override
147  public void threwException(final int taskletId, final Exception exception) {
148    try {
149      failedTasklets(exception, Collections.singletonList(taskletId));
150    } catch (final InterruptedException e) {
151      throw new RuntimeException(e);
152    }
153  }
154
155  /**
156   * A list of Tasklets has failed during aggregation phase.
157   */
158  @Private
159  @Override
160  public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) {
161    try {
162      failedTasklets(exception, taskletIds);
163    } catch (final InterruptedException e) {
164      throw new RuntimeException(e);
165    }
166  }
167
168  /**
169   * Not implemented for local aggregation.
170   */
171  @Private
172  @Override
173  public void cancelled(final int taskletId) {
174    throw new NotImplementedException("Tasklet cancellation not supported in aggregations.");
175  }
176
177  /**
178   * Create and queue result for Tasklets that are expected and invoke callback.
179   */
180  private void completedTasklets(final TOutput output, final List<Integer> taskletIds)
181      throws InterruptedException {
182    final List<TInput> inputs = getInputs(taskletIds);
183    final AggregateResult result = new AggregateResult(output, inputs);
184
185    if (callbackHandler != null) {
186      executor.execute(new Runnable() {
187        @Override
188        public void run() {
189          callbackHandler.onSuccess(result);
190        }
191      });
192    }
193
194    resultQueue.put(new ImmutablePair<>(taskletIds, result));
195  }
196
197  /**
198   * Create and queue result for failed Tasklets that are expected and invokes callback.
199   */
200  private void failedTasklets(final Exception exception, final List<Integer> taskletIds)
201      throws InterruptedException {
202
203    final List<TInput> inputs = getInputs(taskletIds);
204    final AggregateResult failure = new AggregateResult(exception, inputs);
205
206    if (callbackHandler != null) {
207      executor.execute(new Runnable() {
208        @Override
209        public void run() {
210          callbackHandler.onFailure(new VortexAggregateException(exception, inputs));
211        }
212      });
213    }
214
215    resultQueue.put(new ImmutablePair<>(taskletIds, failure));
216  }
217
218  /**
219   * Gets the inputs on Tasklet aggregation completion.
220   */
221  private List<TInput> getInputs(final List<Integer> taskletIds) {
222
223    final List<TInput> inputList = new ArrayList<>(taskletIds.size());
224
225    for(final int taskletId : taskletIds) {
226      inputList.add(taskletIdInputMap.get(taskletId));
227    }
228
229    return inputList;
230  }
231}