001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.vortex.api; 020 021import org.apache.commons.lang3.NotImplementedException; 022import org.apache.commons.lang3.tuple.ImmutablePair; 023import org.apache.commons.lang3.tuple.Pair; 024import org.apache.reef.annotations.Unstable; 025import org.apache.reef.annotations.audience.ClientSide; 026import org.apache.reef.annotations.audience.Private; 027import org.apache.reef.annotations.audience.Public; 028import org.apache.reef.vortex.driver.VortexFutureDelegate; 029 030import javax.annotation.concurrent.NotThreadSafe; 031import java.util.*; 032import java.util.concurrent.*; 033 034/** 035 * The interface between user code and aggregation Tasklets. 036 * Thread safety: This class is not meant to be used in a multi-threaded fashion. 037 * TODO[JIRA REEF-1131]: Create and run tests once functional. 038 */ 039@Public 040@ClientSide 041@NotThreadSafe 042@Unstable 043public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutureDelegate<TOutput> { 044 private final Executor executor; 045 private final BlockingQueue<Pair<List<Integer>, AggregateResult>> resultQueue; 046 private final ConcurrentMap<Integer, TInput> taskletIdInputMap; 047 private final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler; 048 049 @Private 050 public VortexAggregateFuture(final Executor executor, 051 final Map<Integer, TInput> taskletIdInputMap, 052 final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler) { 053 this.executor = executor; 054 this.taskletIdInputMap = new ConcurrentHashMap<>(taskletIdInputMap); 055 this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size()); 056 this.callbackHandler = callbackHandler; 057 } 058 059 /** 060 * @return the next aggregation result for the future, null if no more results. 061 */ 062 public synchronized AggregateResultSynchronous<TInput, TOutput> get() throws InterruptedException { 063 if (taskletIdInputMap.isEmpty()) { 064 return null; 065 } 066 067 final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.take(); 068 069 removeFromTaskletIdInputMap(resultPair.getLeft()); 070 return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty()); 071 } 072 073 /** 074 * @param timeout the timeout for the operation. 075 * @param timeUnit the time unit of the timeout. 076 * @return the next aggregation result for the future, within the user specified timeout, null if no more results. 077 * @throws TimeoutException if time out hits. 078 */ 079 public synchronized AggregateResultSynchronous<TInput, TOutput> get(final long timeout, final TimeUnit timeUnit) 080 throws InterruptedException, TimeoutException { 081 if (taskletIdInputMap.isEmpty()) { 082 return null; 083 } 084 085 final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.poll(timeout, timeUnit); 086 087 if (resultPair == null) { 088 throw new TimeoutException("Synchronous aggregation of the next future result timed out. Timeout = " + timeout 089 + " in time units: " + timeUnit); 090 } 091 092 removeFromTaskletIdInputMap(resultPair.getLeft()); 093 return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty()); 094 } 095 096 private void removeFromTaskletIdInputMap(final List<Integer> taskletIds) { 097 for (final int taskletId : taskletIds) { 098 taskletIdInputMap.remove(taskletId); 099 } 100 } 101 102 /** 103 * @return true if there are no more results to poll. 104 */ 105 public boolean isDone() { 106 return taskletIdInputMap.isEmpty(); 107 } 108 109 /** 110 * A Tasklet associated with the aggregation has completed. 111 */ 112 @Private 113 @Override 114 public void completed(final int taskletId, final TOutput result) { 115 try { 116 completedTasklets(result, Collections.singletonList(taskletId)); 117 } catch (final InterruptedException e) { 118 throw new RuntimeException(e); 119 } 120 } 121 122 /** 123 * Aggregation has completed for a list of Tasklets, with an aggregated result. 124 */ 125 @Private 126 @Override 127 public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) { 128 try { 129 completedTasklets(result, taskletIds); 130 } catch (final InterruptedException e) { 131 throw new RuntimeException(e); 132 } 133 } 134 135 /** 136 * A Tasklet associated with the aggregation has failed. 137 */ 138 @Private 139 @Override 140 public void threwException(final int taskletId, final Exception exception) { 141 try { 142 failedTasklets(exception, Collections.singletonList(taskletId)); 143 } catch (final InterruptedException e) { 144 throw new RuntimeException(e); 145 } 146 } 147 148 /** 149 * A list of Tasklets has failed during aggregation phase. 150 */ 151 @Private 152 @Override 153 public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) { 154 try { 155 failedTasklets(exception, taskletIds); 156 } catch (final InterruptedException e) { 157 throw new RuntimeException(e); 158 } 159 } 160 161 /** 162 * Not implemented for local aggregation. 163 */ 164 @Private 165 @Override 166 public void cancelled(final int taskletId) { 167 throw new NotImplementedException("Tasklet cancellation not supported in aggregations."); 168 } 169 170 /** 171 * Create and queue result for Tasklets that are expected and invoke callback. 172 */ 173 private void completedTasklets(final TOutput output, final List<Integer> taskletIds) 174 throws InterruptedException { 175 final List<TInput> inputs = getInputs(taskletIds); 176 final AggregateResult result = new AggregateResult(output, inputs); 177 178 if (callbackHandler != null) { 179 executor.execute(new Runnable() { 180 @Override 181 public void run() { 182 callbackHandler.onSuccess(result); 183 } 184 }); 185 } 186 187 resultQueue.put(new ImmutablePair<>(taskletIds, result)); 188 } 189 190 /** 191 * Create and queue result for failed Tasklets that are expected and invokes callback. 192 */ 193 private void failedTasklets(final Exception exception, final List<Integer> taskletIds) 194 throws InterruptedException { 195 196 final List<TInput> inputs = getInputs(taskletIds); 197 final AggregateResult failure = new AggregateResult(exception, inputs); 198 199 if (callbackHandler != null) { 200 executor.execute(new Runnable() { 201 @Override 202 public void run() { 203 callbackHandler.onFailure(new VortexAggregateException(exception, inputs)); 204 } 205 }); 206 } 207 208 resultQueue.put(new ImmutablePair<>(taskletIds, failure)); 209 } 210 211 /** 212 * Gets the inputs on Tasklet aggregation completion. 213 */ 214 private List<TInput> getInputs(final List<Integer> taskletIds) { 215 216 final List<TInput> inputList = new ArrayList<>(taskletIds.size()); 217 218 for(final int taskletId : taskletIds) { 219 inputList.add(taskletIdInputMap.get(taskletId)); 220 } 221 222 return inputList; 223 } 224}