This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.vortex.protocol.mastertoworker;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.annotations.audience.DriverSide;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.vortex.api.VortexAggregateFunction;
025import org.apache.reef.vortex.api.VortexAggregatePolicy;
026import org.apache.reef.vortex.api.VortexFunction;
027
028import java.util.List;
029
030/**
031 * A request from the Vortex Driver for the {@link org.apache.reef.vortex.evaluator.VortexWorker} to
032 * record aggregate functions for later execution.
033 */
034@Unstable
035@Private
036@DriverSide
037public final class TaskletAggregationRequest<TInput, TOutput> implements MasterToWorkerRequest {
038  private int aggregateFunctionId;
039  private VortexAggregateFunction<TOutput> userAggregateFunction;
040  private VortexFunction<TInput, TOutput> function;
041  private VortexAggregatePolicy policy;
042
043  /**
044   * No-arg constructor required for Kryo to serialize/deserialize.
045   */
046  TaskletAggregationRequest() {
047  }
048
049  public TaskletAggregationRequest(final int aggregateFunctionId,
050                                   final VortexAggregateFunction<TOutput> aggregateFunction,
051                                   final VortexFunction<TInput, TOutput> function,
052                                   final VortexAggregatePolicy policy) {
053    this.aggregateFunctionId = aggregateFunctionId;
054    this.userAggregateFunction = aggregateFunction;
055    this.function = function;
056    this.policy = policy;
057  }
058
059  @Override
060  public Type getType() {
061    return Type.AggregateTasklets;
062  }
063
064  /**
065   * @return the AggregateFunctionID of the aggregate function.
066   */
067  public int getAggregateFunctionId() {
068    return aggregateFunctionId;
069  }
070
071  /**
072   * @return the aggregate function as specified by the user.
073   */
074  public VortexAggregateFunction getAggregateFunction() {
075    return userAggregateFunction;
076  }
077
078  /**
079   * @return the user specified function.
080   */
081  public VortexFunction getFunction() {
082    return function;
083  }
084
085  /**
086   * @return the aggregation policy.
087   */
088  public VortexAggregatePolicy getPolicy() {
089    return policy;
090  }
091
092  /**
093   * Execute the aggregate function using the list of outputs.
094   * @return Output of the function.
095   */
096  public TOutput executeAggregation(final List<TOutput> outputs) throws Exception {
097    return userAggregateFunction.call(outputs);
098  }
099
100  /**
101   * Execute the user specified function.
102   */
103  public TOutput executeFunction(final TInput input) throws Exception {
104    return function.call(input);
105  }
106}