This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.vortex.api;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.util.Optional;
023import org.apache.reef.vortex.driver.VortexMaster;
024
025import javax.inject.Inject;
026import java.util.List;
027
028/**
029 * Distributed thread pool.
030 */
031@Unstable
032public final class VortexThreadPool {
033  private final VortexMaster vortexMaster;
034
035  @Inject
036  private VortexThreadPool(final VortexMaster vortexMaster) {
037    this.vortexMaster = vortexMaster;
038  }
039
040  /**
041   * @param function to run on Vortex
042   * @param input of the function
043   * @param <TInput> input type
044   * @param <TOutput> output type
045   * @return VortexFuture for tracking execution progress
046   */
047  public <TInput, TOutput> VortexFuture<TOutput>
048      submit(final VortexFunction<TInput, TOutput> function, final TInput input) {
049    return vortexMaster.enqueueTasklet(function, input, Optional.<FutureCallback<TOutput>>empty());
050  }
051
052  /**
053   * @param function to run on Vortex
054   * @param input of the function
055   * @param callback of the function
056   * @param <TInput> input type
057   * @param <TOutput> output type
058   * @return VortexFuture for tracking execution progress
059   */
060  public <TInput, TOutput> VortexFuture<TOutput>
061      submit(final VortexFunction<TInput, TOutput> function, final TInput input,
062             final FutureCallback<TOutput> callback) {
063    return vortexMaster.enqueueTasklet(function, input, Optional.of(callback));
064  }
065
066  /**
067   * @param aggregateFunction to run on VortexFunction outputs
068   * @param function to run on Vortex
069   * @param policy on aggregation
070   * @param inputs of the function
071   * @param <TInput> input type
072   * @param <TOutput> output type
073   * @return VortexAggregationFuture for tracking execution progress of aggregate-able functions
074   */
075  public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
076      submit(final VortexAggregateFunction<TOutput> aggregateFunction,
077             final VortexFunction<TInput, TOutput> function,
078             final VortexAggregatePolicy policy,
079             final List<TInput> inputs) {
080    return vortexMaster.enqueueTasklets(
081        aggregateFunction, function, policy, inputs,
082        Optional.<FutureCallback<AggregateResult<TInput, TOutput>>>empty());
083  }
084
085  /**
086   * @param aggregateFunction to run on VortexFunction outputs
087   * @param function to run on Vortex
088   * @param policy on aggregation
089   * @param inputs of the function
090   * @param callback of the aggregation
091   * @param <TInput> input type
092   * @param <TOutput> output type
093   * @return VortexAggregationFuture for tracking execution progress of aggregate-able functions
094   */
095  public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
096      submit(final VortexAggregateFunction<TOutput> aggregateFunction,
097             final VortexFunction<TInput, TOutput> function,
098             final VortexAggregatePolicy policy,
099             final List<TInput> inputs,
100             final FutureCallback<AggregateResult<TInput, TOutput>> callback) {
101    return vortexMaster.enqueueTasklets(aggregateFunction, function, policy, inputs, Optional.of(callback));
102  }
103}