This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.operators;
020
021import org.apache.reef.driver.task.TaskConfigurationOptions;
022import org.apache.reef.exception.evaluator.NetworkException;
023import org.apache.reef.io.network.exception.ParentDeadException;
024import org.apache.reef.io.network.group.api.operators.Reduce;
025import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
026import org.apache.reef.io.network.impl.NetworkService;
027import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
028import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
029import org.apache.reef.io.network.group.api.task.OperatorTopology;
030import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
031import org.apache.reef.io.network.group.impl.config.parameters.*;
032import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
033import org.apache.reef.io.network.group.impl.utils.Utils;
034import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
035import org.apache.reef.io.serialization.Codec;
036import org.apache.reef.tang.annotations.Name;
037import org.apache.reef.tang.annotations.Parameter;
038import org.apache.reef.wake.EventHandler;
039
040import javax.inject.Inject;
041import java.util.ArrayList;
042import java.util.List;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.logging.Level;
045import java.util.logging.Logger;
046
047public class ReduceSender<T> implements Reduce.Sender<T>, EventHandler<GroupCommunicationMessage> {
048
049  private static final Logger LOG = Logger.getLogger(ReduceSender.class.getName());
050
051  private final Class<? extends Name<String>> groupName;
052  private final Class<? extends Name<String>> operName;
053  private final CommGroupNetworkHandler commGroupNetworkHandler;
054  private final Codec<T> dataCodec;
055  private final NetworkService<GroupCommunicationMessage> netService;
056  private final Sender sender;
057  private final ReduceFunction<T> reduceFunction;
058
059  private final OperatorTopology topology;
060
061  private final CommunicationGroupServiceClient commGroupClient;
062
063  private final AtomicBoolean init = new AtomicBoolean(false);
064
065  private final int version;
066
067  @Inject
068  public ReduceSender(
069      @Parameter(CommunicationGroupName.class) final String groupName,
070      @Parameter(OperatorName.class) final String operName,
071      @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
072      @Parameter(DataCodec.class) final Codec<T> dataCodec,
073      @Parameter(ReduceFunctionParam.class) final ReduceFunction<T> reduceFunction,
074      @Parameter(DriverIdentifierGroupComm.class) final String driverId,
075      @Parameter(TaskVersion.class) final int version,
076      final CommGroupNetworkHandler commGroupNetworkHandler,
077      final NetworkService<GroupCommunicationMessage> netService,
078      final CommunicationGroupServiceClient commGroupClient) {
079
080    super();
081
082    LOG.log(Level.FINEST, "{0} has CommGroupHandler-{1}",
083        new Object[]{operName, commGroupNetworkHandler});
084
085    this.version = version;
086    this.groupName = Utils.getClass(groupName);
087    this.operName = Utils.getClass(operName);
088    this.dataCodec = dataCodec;
089    this.reduceFunction = reduceFunction;
090    this.commGroupNetworkHandler = commGroupNetworkHandler;
091    this.netService = netService;
092    this.sender = new Sender(this.netService);
093    this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version);
094    this.commGroupNetworkHandler.register(this.operName, this);
095    this.commGroupClient = commGroupClient;
096  }
097
098  @Override
099  public int getVersion() {
100    return version;
101  }
102
103  @Override
104  public void initialize() throws ParentDeadException {
105    topology.initialize();
106  }
107
108  @Override
109  public Class<? extends Name<String>> getOperName() {
110    return operName;
111  }
112
113  @Override
114  public Class<? extends Name<String>> getGroupName() {
115    return groupName;
116  }
117
118  @Override
119  public String toString() {
120    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version;
121  }
122
123  @Override
124  public void onNext(final GroupCommunicationMessage msg) {
125    topology.handle(msg);
126  }
127
128  @Override
129  public void send(final T myData) throws NetworkException, InterruptedException {
130    LOG.entering("ReduceSender", "send", this);
131    LOG.fine("I am " + this);
132
133    if (init.compareAndSet(false, true)) {
134      commGroupClient.initialize();
135    }
136    // I am an intermediate node or leaf.
137    LOG.finest("Waiting for children");
138    // Wait for children to send
139    try {
140      final T reducedValueOfChildren = topology.recvFromChildren(reduceFunction, dataCodec);
141      final List<T> vals = new ArrayList<>(2);
142      vals.add(myData);
143      if (reducedValueOfChildren != null) {
144        vals.add(reducedValueOfChildren);
145      }
146      final T reducedValue = reduceFunction.apply(vals);
147      topology.sendToParent(dataCodec.encode(reducedValue), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce);
148    } catch (final ParentDeadException e) {
149      throw new RuntimeException("ParentDeadException", e);
150    }
151    LOG.exiting("ReduceSender", "send", this);
152  }
153
154  @Override
155  public ReduceFunction<T> getReduceFunction() {
156    return reduceFunction;
157  }
158}