This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.operators;
020
021import org.apache.reef.driver.task.TaskConfigurationOptions;
022import org.apache.reef.exception.evaluator.NetworkException;
023import org.apache.reef.io.network.exception.ParentDeadException;
024import org.apache.reef.io.network.group.api.operators.Scatter;
025import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
026import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
027import org.apache.reef.io.network.group.api.task.OperatorTopology;
028import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
029import org.apache.reef.io.network.group.impl.config.parameters.*;
030import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
031import org.apache.reef.io.network.group.impl.utils.ScatterData;
032import org.apache.reef.io.network.group.impl.utils.ScatterDecoder;
033import org.apache.reef.io.network.group.impl.utils.Utils;
034import org.apache.reef.io.network.impl.NetworkService;
035import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
036import org.apache.reef.io.serialization.Codec;
037import org.apache.reef.tang.annotations.Name;
038import org.apache.reef.tang.annotations.Parameter;
039import org.apache.reef.wake.EventHandler;
040
041import javax.inject.Inject;
042import java.util.LinkedList;
043import java.util.List;
044import java.util.concurrent.atomic.AtomicBoolean;
045import java.util.logging.Logger;
046
047public final class ScatterReceiver<T> implements Scatter.Receiver<T>, EventHandler<GroupCommunicationMessage> {
048
049  private static final Logger LOG = Logger.getLogger(ScatterReceiver.class.getName());
050
051  private final Class<? extends Name<String>> groupName;
052  private final Class<? extends Name<String>> operName;
053  private final Codec<T> dataCodec;
054  private final OperatorTopology topology;
055  private final AtomicBoolean init = new AtomicBoolean(false);
056  private final CommunicationGroupServiceClient commGroupClient;
057  private final int version;
058  private final ScatterDecoder scatterDecoder;
059
060  @Inject
061  public ScatterReceiver(@Parameter(CommunicationGroupName.class) final String groupName,
062                         @Parameter(OperatorName.class) final String operName,
063                         @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
064                         @Parameter(DataCodec.class) final Codec<T> dataCodec,
065                         @Parameter(DriverIdentifierGroupComm.class) final String driverId,
066                         @Parameter(TaskVersion.class) final int version,
067                         final CommGroupNetworkHandler commGroupNetworkHandler,
068                         final NetworkService<GroupCommunicationMessage> netService,
069                         final CommunicationGroupServiceClient commGroupClient,
070                         final ScatterDecoder scatterDecoder) {
071    LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString());
072    this.version = version;
073    this.groupName = Utils.getClass(groupName);
074    this.operName = Utils.getClass(operName);
075    this.dataCodec = dataCodec;
076    this.scatterDecoder = scatterDecoder;
077    this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
078                                             selfId, driverId, new Sender(netService), version);
079    this.commGroupClient = commGroupClient;
080    commGroupNetworkHandler.register(this.operName, this);
081  }
082
083  @Override
084  public int getVersion() {
085    return version;
086  }
087
088  @Override
089  public void initialize() throws ParentDeadException {
090    topology.initialize();
091  }
092
093  @Override
094  public Class<? extends Name<String>> getOperName() {
095    return operName;
096  }
097
098  @Override
099  public Class<? extends Name<String>> getGroupName() {
100    return groupName;
101  }
102
103  @Override
104  public String toString() {
105    final StringBuilder sb = new StringBuilder("ScatterReceiver:")
106        .append(Utils.simpleName(groupName))
107        .append(":")
108        .append(Utils.simpleName(operName))
109        .append(":")
110        .append(version);
111    return sb.toString();
112  }
113
114  @Override
115  public void onNext(final GroupCommunicationMessage msg) {
116    topology.handle(msg);
117  }
118
119  @Override
120  public List<T> receive() throws NetworkException, InterruptedException {
121    LOG.entering("ScatterReceiver", "receive");
122    // I am intermediate node or leaf.
123    LOG.fine("I am " + this);
124
125    if (init.compareAndSet(false, true)) {
126      LOG.fine(this + " Communication group initializing.");
127      commGroupClient.initialize();
128      LOG.fine(this + " Communication group initialized.");
129    }
130
131    try {
132      LOG.fine(this + " Waiting to receive scatter from parent.");
133      final byte[] data = topology.recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter);
134
135      if (data == null) {
136        LOG.fine(this + " Received null. Perhaps one of my ancestors is dead.");
137        LOG.exiting("ScatterSender", "receive", null);
138        return null;
139      }
140
141      LOG.fine(this + " Successfully received scattered data.");
142      final ScatterData scatterData = scatterDecoder.decode(data);
143
144      LOG.fine(this + " Trying to propagate messages to children.");
145      topology.sendToChildren(scatterData.getChildrenData(), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter);
146
147      LOG.fine(this + " Decoding data elements sent to me.");
148      final List<T> retList = new LinkedList<>();
149      for (final byte[] singleData : scatterData.getMyData()) {
150        retList.add(dataCodec.decode(singleData));
151      }
152
153      LOG.exiting("ScatterSender", "receive", retList);
154      return retList;
155
156    } catch (final ParentDeadException e) {
157      throw new RuntimeException("ParentDeadException", e);
158    }
159  }
160}