This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.operators;
020
021import org.apache.reef.driver.task.TaskConfigurationOptions;
022import org.apache.reef.exception.evaluator.NetworkException;
023import org.apache.reef.io.network.exception.ParentDeadException;
024import org.apache.reef.io.network.group.api.operators.Gather;
025import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
026import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
027import org.apache.reef.io.network.group.api.task.OperatorTopology;
028import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
029import org.apache.reef.io.network.group.impl.config.parameters.*;
030import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
031import org.apache.reef.io.network.group.impl.utils.Utils;
032import org.apache.reef.io.network.impl.NetworkService;
033import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
034import org.apache.reef.io.serialization.Codec;
035import org.apache.reef.tang.annotations.Name;
036import org.apache.reef.tang.annotations.Parameter;
037import org.apache.reef.wake.EventHandler;
038
039import javax.inject.Inject;
040import java.io.ByteArrayOutputStream;
041import java.io.DataOutputStream;
042import java.io.IOException;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.logging.Logger;
045
046public class GatherSender<T> implements Gather.Sender<T>, EventHandler<GroupCommunicationMessage> {
047
048  private static final Logger LOG = Logger.getLogger(GatherSender.class.getName());
049
050  private final Class<? extends Name<String>> groupName;
051  private final Class<? extends Name<String>> operName;
052  private final Codec<T> dataCodec;
053  private final NetworkService<GroupCommunicationMessage> netService;
054  private final OperatorTopology topology;
055  private final CommunicationGroupServiceClient commGroupClient;
056  private final AtomicBoolean init = new AtomicBoolean(false);
057  private final int version;
058
059  @Inject
060  public GatherSender(@Parameter(CommunicationGroupName.class) final String groupName,
061                      @Parameter(OperatorName.class) final String operName,
062                      @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
063                      @Parameter(DataCodec.class) final Codec<T> dataCodec,
064                      @Parameter(DriverIdentifierGroupComm.class) final String driverId,
065                      @Parameter(TaskVersion.class) final int version,
066                      final CommGroupNetworkHandler commGroupNetworkHandler,
067                      final NetworkService<GroupCommunicationMessage> netService,
068                      final CommunicationGroupServiceClient commGroupClient) {
069    LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString());
070    this.version = version;
071    this.groupName = Utils.getClass(groupName);
072    this.operName = Utils.getClass(operName);
073    this.dataCodec = dataCodec;
074    this.netService = netService;
075    this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
076                                             selfId, driverId, new Sender(netService), version);
077    this.commGroupClient = commGroupClient;
078    commGroupNetworkHandler.register(this.operName, this);
079  }
080
081  @Override
082  public int getVersion() {
083    return version;
084  }
085
086  @Override
087  public void initialize() throws ParentDeadException {
088    topology.initialize();
089  }
090
091  @Override
092  public Class<? extends Name<String>> getOperName() {
093    return operName;
094  }
095
096  @Override
097  public Class<? extends Name<String>> getGroupName() {
098    return groupName;
099  }
100
101  @Override
102  public String toString() {
103    final StringBuilder sb = new StringBuilder("GatherSender:")
104        .append(Utils.simpleName(groupName))
105        .append(":")
106        .append(Utils.simpleName(operName))
107        .append(":")
108        .append(version);
109    return sb.toString();
110  }
111
112  @Override
113  public void onNext(final GroupCommunicationMessage msg) {
114    topology.handle(msg);
115  }
116
117  @Override
118  public void send(final T myData) throws NetworkException, InterruptedException {
119    LOG.entering("GatherSender", "send", myData);
120    // I am an intermediate node or a leaf.
121    LOG.fine("I am " + this);
122
123    if (init.compareAndSet(false, true)) {
124      LOG.fine(this + " Communication group initializing.");
125      commGroupClient.initialize();
126      LOG.fine(this + " Communication group initialized.");
127    }
128
129    try {
130      LOG.finest(this + " Waiting for children.");
131      final byte[] gatheredData = topology.recvFromChildren();
132      final byte[] encodedMyData = dataCodec.encode(myData);
133
134      try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream();
135           final DataOutputStream dstream = new DataOutputStream(bstream)) {
136        dstream.writeUTF(netService.getMyId().toString());
137        dstream.writeInt(encodedMyData.length);
138        dstream.write(encodedMyData);
139        dstream.write(gatheredData);
140        final byte[] mergedData = bstream.toByteArray();
141
142        LOG.fine(this + " Sending merged value to parent.");
143        topology.sendToParent(mergedData, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Gather);
144      }
145    } catch (final ParentDeadException e) {
146      throw new RuntimeException("ParentDeadException", e);
147    } catch (final IOException e) {
148      throw new RuntimeException("IOException", e);
149    }
150    LOG.exiting("GatherSender", "send");
151  }
152}