001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.vortex.api; 020 021import org.apache.commons.lang3.NotImplementedException; 022import org.apache.commons.lang3.tuple.ImmutablePair; 023import org.apache.commons.lang3.tuple.Pair; 024import org.apache.reef.annotations.Unstable; 025import org.apache.reef.annotations.audience.ClientSide; 026import org.apache.reef.annotations.audience.Private; 027import org.apache.reef.annotations.audience.Public; 028import org.apache.reef.io.serialization.Codec; 029import org.apache.reef.vortex.common.VortexFutureDelegate; 030 031import javax.annotation.concurrent.NotThreadSafe; 032import java.util.*; 033import java.util.concurrent.*; 034 035/** 036 * The interface between user code and aggregation Tasklets. 037 * Thread safety: This class is not meant to be used in a multi-threaded fashion. 038 * TODO[JIRA REEF-1131]: Create and run tests once functional. 039 */ 040@Public 041@ClientSide 042@NotThreadSafe 043@Unstable 044public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutureDelegate { 045 private final Executor executor; 046 private final Codec<TOutput> aggOutputCodec; 047 private final BlockingQueue<Pair<List<Integer>, AggregateResult>> resultQueue; 048 private final ConcurrentMap<Integer, TInput> taskletIdInputMap; 049 private final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler; 050 051 @Private 052 public VortexAggregateFuture(final Executor executor, 053 final Map<Integer, TInput> taskletIdInputMap, 054 final Codec<TOutput> aggOutputCodec, 055 final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler) { 056 this.executor = executor; 057 this.taskletIdInputMap = new ConcurrentHashMap<>(taskletIdInputMap); 058 this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size()); 059 this.aggOutputCodec = aggOutputCodec; 060 this.callbackHandler = callbackHandler; 061 } 062 063 /** 064 * @return the next aggregation result for the future, null if no more results. 065 */ 066 public synchronized AggregateResultSynchronous<TInput, TOutput> get() throws InterruptedException { 067 if (taskletIdInputMap.isEmpty()) { 068 return null; 069 } 070 071 final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.take(); 072 073 removeFromTaskletIdInputMap(resultPair.getLeft()); 074 return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty()); 075 } 076 077 /** 078 * @param timeout the timeout for the operation. 079 * @param timeUnit the time unit of the timeout. 080 * @return the next aggregation result for the future, within the user specified timeout, null if no more results. 081 * @throws TimeoutException if time out hits. 082 */ 083 public synchronized AggregateResultSynchronous<TInput, TOutput> get(final long timeout, final TimeUnit timeUnit) 084 throws InterruptedException, TimeoutException { 085 if (taskletIdInputMap.isEmpty()) { 086 return null; 087 } 088 089 final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.poll(timeout, timeUnit); 090 091 if (resultPair == null) { 092 throw new TimeoutException(); 093 } 094 095 removeFromTaskletIdInputMap(resultPair.getLeft()); 096 return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty()); 097 } 098 099 private void removeFromTaskletIdInputMap(final List<Integer> taskletIds) { 100 for (final int taskletId : taskletIds) { 101 taskletIdInputMap.remove(taskletId); 102 } 103 } 104 105 /** 106 * @return true if there are no more results to poll. 107 */ 108 public boolean isDone() { 109 return taskletIdInputMap.isEmpty(); 110 } 111 112 /** 113 * A Tasklet associated with the aggregation has completed. 114 */ 115 @Private 116 @Override 117 public void completed(final int taskletId, final byte[] serializedResult) { 118 try { 119 // TODO[REEF-1113]: Handle serialization failure separately in Vortex 120 final TOutput result = aggOutputCodec.decode(serializedResult); 121 completedTasklets(result, Collections.singletonList(taskletId)); 122 } catch (final InterruptedException e) { 123 throw new RuntimeException(e); 124 } 125 } 126 127 /** 128 * Aggregation has completed for a list of Tasklets, with an aggregated result. 129 */ 130 @Private 131 @Override 132 public void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult) { 133 try { 134 // TODO[REEF-1113]: Handle serialization failure separately in Vortex 135 final TOutput result = aggOutputCodec.decode(serializedResult); 136 completedTasklets(result, taskletIds); 137 } catch (final InterruptedException e) { 138 throw new RuntimeException(e); 139 } 140 } 141 142 /** 143 * A Tasklet associated with the aggregation has failed. 144 */ 145 @Private 146 @Override 147 public void threwException(final int taskletId, final Exception exception) { 148 try { 149 failedTasklets(exception, Collections.singletonList(taskletId)); 150 } catch (final InterruptedException e) { 151 throw new RuntimeException(e); 152 } 153 } 154 155 /** 156 * A list of Tasklets has failed during aggregation phase. 157 */ 158 @Private 159 @Override 160 public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) { 161 try { 162 failedTasklets(exception, taskletIds); 163 } catch (final InterruptedException e) { 164 throw new RuntimeException(e); 165 } 166 } 167 168 /** 169 * Not implemented for local aggregation. 170 */ 171 @Private 172 @Override 173 public void cancelled(final int taskletId) { 174 throw new NotImplementedException("Tasklet cancellation not supported in aggregations."); 175 } 176 177 /** 178 * Create and queue result for Tasklets that are expected and invoke callback. 179 */ 180 private void completedTasklets(final TOutput output, final List<Integer> taskletIds) 181 throws InterruptedException { 182 final List<TInput> inputs = getInputs(taskletIds); 183 final AggregateResult result = new AggregateResult(output, inputs); 184 185 if (callbackHandler != null) { 186 executor.execute(new Runnable() { 187 @Override 188 public void run() { 189 callbackHandler.onSuccess(result); 190 } 191 }); 192 } 193 194 resultQueue.put(new ImmutablePair<>(taskletIds, result)); 195 } 196 197 /** 198 * Create and queue result for failed Tasklets that are expected and invokes callback. 199 */ 200 private void failedTasklets(final Exception exception, final List<Integer> taskletIds) 201 throws InterruptedException { 202 203 final List<TInput> inputs = getInputs(taskletIds); 204 final AggregateResult failure = new AggregateResult(exception, inputs); 205 206 if (callbackHandler != null) { 207 executor.execute(new Runnable() { 208 @Override 209 public void run() { 210 callbackHandler.onFailure(new VortexAggregateException(exception, inputs)); 211 } 212 }); 213 } 214 215 resultQueue.put(new ImmutablePair<>(taskletIds, failure)); 216 } 217 218 /** 219 * Gets the inputs on Tasklet aggregation completion. 220 */ 221 private List<TInput> getInputs(final List<Integer> taskletIds) { 222 223 final List<TInput> inputList = new ArrayList<>(taskletIds.size()); 224 225 for(final int taskletId : taskletIds) { 226 inputList.add(taskletIdInputMap.get(taskletId)); 227 } 228 229 return inputList; 230 } 231}