This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.driver;
020
021import org.apache.reef.driver.parameters.DriverIdentifier;
022import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
023import org.apache.reef.io.network.group.api.GroupChanges;
024import org.apache.reef.io.network.group.api.config.OperatorSpec;
025import org.apache.reef.io.network.group.api.driver.TaskNode;
026import org.apache.reef.io.network.group.api.driver.Topology;
027import org.apache.reef.io.network.group.impl.GroupChangesCodec;
028import org.apache.reef.io.network.group.impl.GroupChangesImpl;
029import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
030import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
031import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
032import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
033import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
034import org.apache.reef.io.network.group.impl.config.parameters.*;
035import org.apache.reef.io.network.group.impl.operators.*;
036import org.apache.reef.io.network.group.impl.utils.Utils;
037import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
038import org.apache.reef.io.serialization.Codec;
039import org.apache.reef.tang.Configuration;
040import org.apache.reef.tang.JavaConfigurationBuilder;
041import org.apache.reef.tang.Tang;
042import org.apache.reef.tang.annotations.Name;
043import org.apache.reef.tang.annotations.Parameter;
044import org.apache.reef.tang.formats.AvroConfigurationSerializer;
045import org.apache.reef.tang.formats.ConfigurationSerializer;
046import org.apache.reef.wake.EStage;
047import org.apache.reef.wake.EventHandler;
048import org.apache.reef.wake.impl.SingleThreadStage;
049
050import javax.inject.Inject;
051import java.util.ArrayList;
052import java.util.List;
053import java.util.Map;
054import java.util.concurrent.ConcurrentMap;
055import java.util.concurrent.ConcurrentSkipListMap;
056import java.util.logging.Logger;
057
058/**
059 * Implements a tree topology with the specified Fan Out.
060 */
061public class TreeTopology implements Topology {
062
063  private static final Logger LOG = Logger.getLogger(TreeTopology.class.getName());
064
065  private final EStage<GroupCommunicationMessage> senderStage;
066  private final Class<? extends Name<String>> groupName;
067  private final Class<? extends Name<String>> operName;
068  private final String driverId;
069  private String rootId;
070  private OperatorSpec operatorSpec;
071
072  private TaskNode root;
073  private TaskNode logicalRoot;
074  private TaskNode prev;
075  private final int fanOut;
076
077  private final ConcurrentMap<String, TaskNode> nodes = new ConcurrentSkipListMap<>();
078  private final ConfigurationSerializer confSer = new AvroConfigurationSerializer();
079
080  /**
081   * @deprecated in 0.14. Use Tang to obtain an instance of this instead.
082   */
083  @Deprecated
084  public TreeTopology(final EStage<GroupCommunicationMessage> senderStage,
085                      final Class<? extends Name<String>> groupName,
086                      final Class<? extends Name<String>> operatorName,
087                      final String driverId, final int numberOfTasks, final int fanOut) {
088    this.senderStage = senderStage;
089    this.groupName = groupName;
090    this.operName = operatorName;
091    this.driverId = driverId;
092    this.fanOut = fanOut;
093    LOG.config(getQualifiedName() + "Tree Topology running with a fan-out of " + fanOut);
094  }
095
096  @Inject
097  private TreeTopology(@Parameter(GroupCommSenderStage.class) final EStage<GroupCommunicationMessage> senderStage,
098                       @Parameter(CommGroupNameClass.class) final Class<? extends Name<String>> groupName,
099                       @Parameter(OperatorNameClass.class) final Class<? extends Name<String>> operatorName,
100                       @Parameter(DriverIdentifier.class) final String driverId,
101                       @Parameter(TreeTopologyFanOut.class) final int fanOut) {
102    this.senderStage = senderStage;
103    this.groupName = groupName;
104    this.operName = operatorName;
105    this.driverId = driverId;
106    this.fanOut = fanOut;
107    LOG.config(getQualifiedName() + "Tree Topology running with a fan-out of " + fanOut);
108  }
109
110  @Override
111  @SuppressWarnings("checkstyle:hiddenfield")
112  public void setRootTask(final String rootId) {
113    LOG.entering("TreeTopology", "setRootTask", new Object[]{getQualifiedName(), rootId});
114    this.rootId = rootId;
115    LOG.exiting("TreeTopology", "setRootTask", getQualifiedName() + rootId);
116  }
117
118  @Override
119  public String getRootId() {
120    LOG.entering("TreeTopology", "getRootId", getQualifiedName());
121    LOG.exiting("TreeTopology", "getRootId", getQualifiedName() + rootId);
122    return rootId;
123  }
124
125  @Override
126  public boolean isRootPresent() {
127    LOG.entering("TreeTopology", "isRootPresent", getQualifiedName());
128    final boolean retVal = root != null;
129    LOG.exiting("TreeTopology", "isRootPresent", String.format("%s%s", getQualifiedName(), retVal));
130    return retVal;
131  }
132
133  @Override
134  public void setOperatorSpecification(final OperatorSpec spec) {
135    LOG.entering("TreeTopology", "setOperSpec", new Object[]{getQualifiedName(), spec});
136    this.operatorSpec = spec;
137    LOG.exiting("TreeTopology", "setOperSpec", getQualifiedName() + spec);
138  }
139
140  @Override
141  public Configuration getTaskConfiguration(final String taskId) {
142    LOG.entering("TreeTopology", "getTaskConfig", new Object[]{getQualifiedName(), taskId});
143    final TaskNode taskNode = nodes.get(taskId);
144    if (taskNode == null) {
145      throw new RuntimeException(getQualifiedName() + taskId + " does not exist");
146    }
147
148    final int version = getNodeVersion(taskId);
149    final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
150    jcb.bindNamedParameter(DataCodec.class, operatorSpec.getDataCodecClass());
151    jcb.bindNamedParameter(TaskVersion.class, Integer.toString(version));
152    if (operatorSpec instanceof BroadcastOperatorSpec) {
153      final BroadcastOperatorSpec broadcastOperatorSpec = (BroadcastOperatorSpec) operatorSpec;
154      if (taskId.equals(broadcastOperatorSpec.getSenderId())) {
155        jcb.bindImplementation(GroupCommOperator.class, BroadcastSender.class);
156      } else {
157        jcb.bindImplementation(GroupCommOperator.class, BroadcastReceiver.class);
158      }
159    } else if (operatorSpec instanceof ReduceOperatorSpec) {
160      final ReduceOperatorSpec reduceOperatorSpec = (ReduceOperatorSpec) operatorSpec;
161      jcb.bindNamedParameter(ReduceFunctionParam.class, reduceOperatorSpec.getRedFuncClass());
162      if (taskId.equals(reduceOperatorSpec.getReceiverId())) {
163        jcb.bindImplementation(GroupCommOperator.class, ReduceReceiver.class);
164      } else {
165        jcb.bindImplementation(GroupCommOperator.class, ReduceSender.class);
166      }
167    } else if (operatorSpec instanceof ScatterOperatorSpec) {
168      final ScatterOperatorSpec scatterOperatorSpec = (ScatterOperatorSpec) operatorSpec;
169      if (taskId.equals(scatterOperatorSpec.getSenderId())) {
170        jcb.bindImplementation(GroupCommOperator.class, ScatterSender.class);
171      } else {
172        jcb.bindImplementation(GroupCommOperator.class, ScatterReceiver.class);
173      }
174    } else if (operatorSpec instanceof GatherOperatorSpec) {
175      final GatherOperatorSpec gatherOperatorSpec = (GatherOperatorSpec) operatorSpec;
176      if (taskId.equals(gatherOperatorSpec.getReceiverId())) {
177        jcb.bindImplementation(GroupCommOperator.class, GatherReceiver.class);
178      } else {
179        jcb.bindImplementation(GroupCommOperator.class, GatherSender.class);
180      }
181    }
182    final Configuration retConf = jcb.build();
183    LOG.exiting("TreeTopology", "getTaskConfig", getQualifiedName() + confSer.toString(retConf));
184    return retConf;
185  }
186
187  @Override
188  public int getNodeVersion(final String taskId) {
189    LOG.entering("TreeTopology", "getNodeVersion", new Object[]{getQualifiedName(), taskId});
190    final TaskNode node = nodes.get(taskId);
191    if (node == null) {
192      throw new RuntimeException(getQualifiedName() + taskId + " is not available on the nodes map");
193    }
194    final int version = node.getVersion();
195    LOG.exiting("TreeTopology", "getNodeVersion", getQualifiedName() + " " + taskId + " " + version);
196    return version;
197  }
198
199  @Override
200  public void removeTask(final String taskId) {
201    LOG.entering("TreeTopology", "removeTask", new Object[]{getQualifiedName(), taskId});
202    if (!nodes.containsKey(taskId)) {
203      LOG.fine("Trying to remove a non-existent node in the task graph");
204      LOG.exiting("TreeTopology", "removeTask", getQualifiedName());
205      return;
206    }
207    if (taskId.equals(rootId)) {
208      unsetRootNode(taskId);
209    } else {
210      removeChild(taskId);
211    }
212    LOG.exiting("TreeTopology", "removeTask", getQualifiedName() + taskId);
213  }
214
215  @Override
216  public void addTask(final String taskId) {
217    LOG.entering("TreeTopology", "addTask", new Object[]{getQualifiedName(), taskId});
218    if (nodes.containsKey(taskId)) {
219      LOG.fine("Got a request to add a task that is already in the graph. " +
220          "We need to block this request till the delete finishes. ***CAUTION***");
221    }
222
223    if (taskId.equals(rootId)) {
224      setRootNode(taskId);
225    } else {
226      addChild(taskId);
227    }
228    LOG.exiting("TreeTopology", "addTask", getQualifiedName() + taskId);
229  }
230
231  private void addChild(final String taskId) {
232    LOG.entering("TreeTopology", "addChild", new Object[]{getQualifiedName(), taskId});
233    LOG.finest(getQualifiedName() + "Adding leaf " + taskId);
234    final TaskNode node = new TaskNodeImpl(senderStage, groupName, operName, taskId, driverId, false);
235    if (logicalRoot != null) {
236      addTaskNode(node);
237      prev = node;
238    }
239    nodes.put(taskId, node);
240    LOG.exiting("TreeTopology", "addChild", getQualifiedName() + taskId);
241  }
242
243  private void addTaskNode(final TaskNode node) {
244    LOG.entering("TreeTopology", "addTaskNode", new Object[]{getQualifiedName(), node});
245    if (logicalRoot.getNumberOfChildren() >= this.fanOut) {
246      logicalRoot = logicalRoot.successor();
247    }
248    node.setParent(logicalRoot);
249    logicalRoot.addChild(node);
250    prev.setSibling(node);
251    LOG.exiting("TreeTopology", "addTaskNode", getQualifiedName() + node);
252  }
253
254  private void removeChild(final String taskId) {
255    LOG.entering("TreeTopology", "removeChild", new Object[]{getQualifiedName(), taskId});
256    if (root != null) {
257      root.removeChild(nodes.get(taskId));
258    }
259    nodes.remove(taskId);
260    LOG.exiting("TreeTopology", "removeChild", getQualifiedName() + taskId);
261  }
262
263  private void setRootNode(final String newRootId) {
264    LOG.entering("TreeTopology", "setRootNode", new Object[]{getQualifiedName(), newRootId});
265    this.root = new TaskNodeImpl(senderStage, groupName, operName, newRootId, driverId, true);
266    this.logicalRoot = this.root;
267    this.prev = this.root;
268
269    for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) {
270      final TaskNode leaf = nodeEntry.getValue();
271      addTaskNode(leaf);
272      this.prev = leaf;
273    }
274    nodes.put(newRootId, root);
275    LOG.exiting("TreeTopology", "setRootNode", getQualifiedName() + newRootId);
276  }
277
278  private void unsetRootNode(final String taskId) {
279    LOG.entering("TreeTopology", "unsetRootNode", new Object[]{getQualifiedName(), taskId});
280    nodes.remove(rootId);
281    root = null;
282
283    for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) {
284      final TaskNode leaf = nodeEntry.getValue();
285      leaf.setParent(null);
286    }
287    LOG.exiting("TreeTopology", "unsetRootNode", getQualifiedName() + taskId);
288  }
289
290  @Override
291  public void onFailedTask(final String taskId) {
292    LOG.entering("TreeTopology", "onFailedTask", new Object[]{getQualifiedName(), taskId});
293    final TaskNode taskNode = nodes.get(taskId);
294    if (taskNode == null) {
295      throw new RuntimeException(getQualifiedName() + taskId + " does not exist");
296    }
297    taskNode.onFailedTask();
298    LOG.exiting("TreeTopology", "onFailedTask", getQualifiedName() + taskId);
299  }
300
301  @Override
302  public void onRunningTask(final String taskId) {
303    LOG.entering("TreeTopology", "onRunningTask", new Object[]{getQualifiedName(), taskId});
304    final TaskNode taskNode = nodes.get(taskId);
305    if (taskNode == null) {
306      throw new RuntimeException(getQualifiedName() + taskId + " does not exist");
307    }
308    taskNode.onRunningTask();
309    LOG.exiting("TreeTopology", "onRunningTask", getQualifiedName() + taskId);
310  }
311
312  @Override
313  public void onReceiptOfMessage(final GroupCommunicationMessage msg) {
314    LOG.entering("TreeTopology", "onReceiptOfMessage", new Object[]{getQualifiedName(), msg});
315    switch (msg.getType()) {
316    case TopologyChanges:
317      onTopologyChanges(msg);
318      break;
319    case UpdateTopology:
320      onUpdateTopology(msg);
321      break;
322
323    default:
324      nodes.get(msg.getSrcid()).onReceiptOfAcknowledgement(msg);
325      break;
326    }
327    LOG.exiting("TreeTopology", "onReceiptOfMessage", getQualifiedName() + msg);
328  }
329
330  private void onUpdateTopology(final GroupCommunicationMessage msg) {
331    LOG.entering("TreeTopology", "onUpdateTopology", new Object[]{getQualifiedName(), msg});
332    LOG.fine(getQualifiedName() + "Update affected parts of Topology");
333    final String dstId = msg.getSrcid();
334    final int version = getNodeVersion(dstId);
335
336    LOG.finest(getQualifiedName() + "Creating NodeTopologyUpdateWaitStage to wait on nodes to be updated");
337    final EventHandler<List<TaskNode>> topoUpdateWaitHandler = new TopologyUpdateWaitHandler(senderStage, groupName,
338        operName, driverId, 0,
339        dstId, version,
340        getQualifiedName(), TopologySerializer.encode(root));
341    final EStage<List<TaskNode>> nodeTopologyUpdateWaitStage = new SingleThreadStage<>("NodeTopologyUpdateWaitStage",
342        topoUpdateWaitHandler,
343        nodes.size());
344
345    final List<TaskNode> toBeUpdatedNodes = new ArrayList<>(nodes.size());
346    LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
347    for (final TaskNode node : nodes.values()) {
348      if (node.isRunning() && node.hasChanges() && node.resetTopologySetupSent()) {
349        toBeUpdatedNodes.add(node);
350      }
351    }
352    for (final TaskNode node : toBeUpdatedNodes) {
353      node.updatingTopology();
354      LOG.fine(getQualifiedName() + "Asking " + node + " to UpdateTopology");
355      senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
356          ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, node.getTaskId(),
357          node.getVersion(), Utils.EMPTY_BYTE_ARR));
358    }
359    nodeTopologyUpdateWaitStage.onNext(toBeUpdatedNodes);
360    LOG.exiting("TreeTopology", "onUpdateTopology", getQualifiedName() + msg);
361  }
362
363  private void onTopologyChanges(final GroupCommunicationMessage msg) {
364    LOG.entering("TreeTopology", "onTopologyChanges", new Object[]{getQualifiedName(), msg});
365    LOG.fine(getQualifiedName() + "Check TopologyChanges");
366    final String dstId = msg.getSrcid();
367    boolean hasTopologyChanged = false;
368    LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
369    for (final TaskNode node : nodes.values()) {
370      if (!node.isRunning() || node.hasChanges()) {
371        hasTopologyChanged = true;
372        break;
373      }
374    }
375    final GroupChanges changes = new GroupChangesImpl(hasTopologyChanged);
376    final Codec<GroupChanges> changesCodec = new GroupChangesCodec();
377    LOG.fine(getQualifiedName() + "TopologyChanges: " + changes);
378    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
379        ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, dstId, getNodeVersion(dstId),
380        changesCodec.encode(changes)));
381    LOG.exiting("TreeTopology", "onTopologyChanges", getQualifiedName() + msg);
382  }
383
384  private String getQualifiedName() {
385    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + " - ";
386  }
387}