This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.operators;
020
021import org.apache.reef.driver.task.TaskConfigurationOptions;
022import org.apache.reef.exception.evaluator.NetworkException;
023import org.apache.reef.io.network.exception.ParentDeadException;
024import org.apache.reef.io.network.group.api.operators.Reduce;
025import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
026import org.apache.reef.io.network.impl.NetworkService;
027import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
028import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
029import org.apache.reef.io.network.group.api.task.OperatorTopology;
030import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
031import org.apache.reef.io.network.group.impl.config.parameters.*;
032import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
033import org.apache.reef.io.network.group.impl.utils.Utils;
034import org.apache.reef.io.serialization.Codec;
035import org.apache.reef.tang.annotations.Name;
036import org.apache.reef.tang.annotations.Parameter;
037import org.apache.reef.wake.EventHandler;
038import org.apache.reef.wake.Identifier;
039
040import javax.inject.Inject;
041
042import java.util.List;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.logging.Logger;
045
046public class ReduceReceiver<T> implements Reduce.Receiver<T>, EventHandler<GroupCommunicationMessage> {
047
048  private static final Logger LOG = Logger.getLogger(ReduceReceiver.class.getName());
049
050  private final Class<? extends Name<String>> groupName;
051  private final Class<? extends Name<String>> operName;
052  private final CommGroupNetworkHandler commGroupNetworkHandler;
053  private final Codec<T> dataCodec;
054  private final NetworkService<GroupCommunicationMessage> netService;
055  private final Sender sender;
056  private final ReduceFunction<T> reduceFunction;
057
058  private final OperatorTopology topology;
059
060  private final CommunicationGroupServiceClient commGroupClient;
061
062  private final AtomicBoolean init = new AtomicBoolean(false);
063
064  private final int version;
065
066  @Inject
067  public ReduceReceiver(@Parameter(CommunicationGroupName.class) final String groupName,
068                        @Parameter(OperatorName.class) final String operName,
069                        @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
070                        @Parameter(DataCodec.class) final Codec<T> dataCodec,
071                        @Parameter(ReduceFunctionParam.class) final ReduceFunction<T> reduceFunction,
072                        @Parameter(DriverIdentifierGroupComm.class) final String driverId,
073                        @Parameter(TaskVersion.class) final int version,
074                        final CommGroupNetworkHandler commGroupNetworkHandler,
075                        final NetworkService<GroupCommunicationMessage> netService,
076                        final CommunicationGroupServiceClient commGroupClient) {
077    super();
078    this.version = version;
079    LOG.finest(operName + " has CommGroupHandler-" + commGroupNetworkHandler.toString());
080    this.groupName = Utils.getClass(groupName);
081    this.operName = Utils.getClass(operName);
082    this.dataCodec = dataCodec;
083    this.reduceFunction = reduceFunction;
084    this.commGroupNetworkHandler = commGroupNetworkHandler;
085    this.netService = netService;
086    this.sender = new Sender(this.netService);
087    this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version);
088    this.commGroupNetworkHandler.register(this.operName, this);
089    this.commGroupClient = commGroupClient;
090  }
091
092  @Override
093  public int getVersion() {
094    return version;
095  }
096
097  @Override
098  public void initialize() throws ParentDeadException {
099    topology.initialize();
100  }
101
102  @Override
103  public Class<? extends Name<String>> getOperName() {
104    return operName;
105  }
106
107  @Override
108  public Class<? extends Name<String>> getGroupName() {
109    return groupName;
110  }
111
112  @Override
113  public String toString() {
114    return "ReduceReceiver:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version;
115  }
116
117  @Override
118  public void onNext(final GroupCommunicationMessage msg) {
119    topology.handle(msg);
120  }
121
122  @Override
123  public T reduce() throws InterruptedException, NetworkException {
124    LOG.entering("ReduceReceiver", "reduce", this);
125    LOG.fine("I am " + this);
126
127    if (init.compareAndSet(false, true)) {
128      commGroupClient.initialize();
129    }
130    // I am root
131    LOG.fine(this + " Waiting to receive reduced value");
132    // Wait for children to send
133    final T redVal;
134    try {
135      redVal = topology.recvFromChildren(reduceFunction, dataCodec);
136    } catch (final ParentDeadException e) {
137      throw new RuntimeException("ParentDeadException", e);
138    }
139    LOG.exiting("ReduceReceiver", "reduce", this);
140    return redVal;
141  }
142
143  @Override
144  public T reduce(final List<? extends Identifier> order) throws InterruptedException, NetworkException {
145    throw new UnsupportedOperationException();
146  }
147
148  @Override
149  public ReduceFunction<T> getReduceFunction() {
150    return reduceFunction;
151  }
152
153}