This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.operators;
020
021import org.apache.reef.driver.task.TaskConfigurationOptions;
022import org.apache.reef.exception.evaluator.NetworkException;
023import org.apache.reef.io.network.exception.ParentDeadException;
024import org.apache.reef.io.network.group.api.operators.Scatter;
025import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
026import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
027import org.apache.reef.io.network.group.api.task.OperatorTopology;
028import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
029import org.apache.reef.io.network.group.impl.config.parameters.*;
030import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
031import org.apache.reef.io.network.group.impl.utils.ScatterEncoder;
032import org.apache.reef.io.network.group.impl.utils.ScatterHelper;
033import org.apache.reef.io.network.group.impl.utils.Utils;
034import org.apache.reef.io.network.impl.NetworkService;
035import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
036import org.apache.reef.io.serialization.Codec;
037import org.apache.reef.tang.annotations.Name;
038import org.apache.reef.tang.annotations.Parameter;
039import org.apache.reef.wake.EventHandler;
040import org.apache.reef.wake.Identifier;
041
042import javax.inject.Inject;
043import java.util.Arrays;
044import java.util.List;
045import java.util.Map;
046import java.util.concurrent.atomic.AtomicBoolean;
047import java.util.logging.Logger;
048
049public final class ScatterSender<T> implements Scatter.Sender<T>, EventHandler<GroupCommunicationMessage> {
050
051  private static final Logger LOG = Logger.getLogger(ScatterSender.class.getName());
052
053  private final Class<? extends Name<String>> groupName;
054  private final Class<? extends Name<String>> operName;
055  private final Codec<T> dataCodec;
056  private final OperatorTopology topology;
057  private final AtomicBoolean init = new AtomicBoolean(false);
058  private final CommunicationGroupServiceClient commGroupClient;
059  private final int version;
060  private final ScatterEncoder scatterEncoder;
061
062  @Inject
063  public ScatterSender(@Parameter(CommunicationGroupName.class) final String groupName,
064                       @Parameter(OperatorName.class) final String operName,
065                       @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
066                       @Parameter(DataCodec.class) final Codec<T> dataCodec,
067                       @Parameter(DriverIdentifierGroupComm.class) final String driverId,
068                       @Parameter(TaskVersion.class) final int version,
069                       final CommGroupNetworkHandler commGroupNetworkHandler,
070                       final NetworkService<GroupCommunicationMessage> netService,
071                       final CommunicationGroupServiceClient commGroupClient,
072                       final ScatterEncoder scatterEncoder) {
073    LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString());
074    this.version = version;
075    this.groupName = Utils.getClass(groupName);
076    this.operName = Utils.getClass(operName);
077    this.dataCodec = dataCodec;
078    this.scatterEncoder = scatterEncoder;
079    this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
080                                             selfId, driverId, new Sender(netService), version);
081    this.commGroupClient = commGroupClient;
082    commGroupNetworkHandler.register(this.operName, this);
083  }
084
085  @Override
086  public int getVersion() {
087    return version;
088  }
089
090  @Override
091  public void initialize() throws ParentDeadException {
092    topology.initialize();
093  }
094
095  @Override
096  public Class<? extends Name<String>> getOperName() {
097    return operName;
098  }
099
100  @Override
101  public Class<? extends Name<String>> getGroupName() {
102    return groupName;
103  }
104
105  @Override
106  public String toString() {
107    final StringBuilder sb = new StringBuilder("ScatterSender:")
108        .append(Utils.simpleName(groupName))
109        .append(":")
110        .append(Utils.simpleName(operName))
111        .append(":")
112        .append(version);
113    return sb.toString();
114  }
115
116  @Override
117  public void onNext(final GroupCommunicationMessage msg) {
118    topology.handle(msg);
119  }
120
121  private void initializeGroup() {
122    if (init.compareAndSet(false, true)) {
123      LOG.fine(this + " Communication group initializing.");
124      commGroupClient.initialize();
125      LOG.fine(this + " Communication group initialized.");
126    }
127  }
128
129  @Override
130  public void send(final List<T> elements) throws NetworkException, InterruptedException {
131    LOG.entering("ScatterSender", "send");
132
133    initializeGroup();
134    send(elements,
135        ScatterHelper.getUniformCounts(elements.size(), commGroupClient.getActiveSlaveTasks().size()),
136        commGroupClient.getActiveSlaveTasks());
137
138    LOG.exiting("ScatterSender", "send");
139  }
140
141  @Override
142  public void send(final List<T> elements, final Integer... counts)
143      throws NetworkException, InterruptedException {
144    LOG.entering("ScatterSender", "send");
145
146    initializeGroup();
147    if (counts.length != commGroupClient.getActiveSlaveTasks().size()) {
148      throw new RuntimeException("Parameter 'counts' has length " + counts.length
149          + ", but number of slaves is " + commGroupClient.getActiveSlaveTasks().size());
150    }
151
152    send(elements,
153        Arrays.asList(counts),
154        commGroupClient.getActiveSlaveTasks());
155
156    LOG.exiting("ScatterSender", "send");
157  }
158
159  @Override
160  public void send(final List<T> elements, final List<? extends Identifier> order)
161      throws NetworkException, InterruptedException {
162    LOG.entering("ScatterSender", "send");
163
164    initializeGroup();
165    send(elements,
166        ScatterHelper.getUniformCounts(elements.size(), order.size()),
167        order);
168
169    LOG.exiting("ScatterSender", "send");
170  }
171
172  @Override
173  public void send(final List<T> elements, final List<Integer> counts, final List<? extends Identifier> order)
174      throws NetworkException, InterruptedException {
175    LOG.entering("ScatterSender", "send");
176
177    if (counts.size() != order.size()) {
178      throw new RuntimeException("Parameter 'counts' has size " + counts.size()
179          + ", but parameter 'order' has size " + order.size() + ".");
180    }
181    initializeGroup();
182
183    // I am root.
184    LOG.fine("I am " + this);
185
186    LOG.fine(this + " Encoding data and determining which Tasks receive which elements.");
187    final Map<String, byte[]> mapOfChildIdToBytes = scatterEncoder.encode(elements, counts, order, dataCodec);
188
189    try {
190      LOG.fine(this + " Sending " + elements.size() + " elements.");
191      topology.sendToChildren(mapOfChildIdToBytes, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter);
192
193    } catch (final ParentDeadException e) {
194      throw new RuntimeException("ParentDeadException during OperatorTopology.sendToChildren()", e);
195    }
196
197    LOG.exiting("ScatterSender", "send");
198  }
199}