This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.task;
020
021import org.apache.commons.lang.ArrayUtils;
022import org.apache.reef.exception.evaluator.NetworkException;
023import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
024import org.apache.reef.io.network.group.api.task.NodeStruct;
025import org.apache.reef.io.network.group.api.task.OperatorTopologyStruct;
026import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
027import org.apache.reef.io.network.group.impl.operators.Sender;
028import org.apache.reef.io.network.group.impl.utils.Utils;
029import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
030import org.apache.reef.io.serialization.Codec;
031import org.apache.reef.tang.annotations.Name;
032
033import java.util.*;
034import java.util.concurrent.BlockingQueue;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ConcurrentMap;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.logging.Logger;
039
040/**
041 *
042 */
043public class OperatorTopologyStructImpl implements OperatorTopologyStruct {
044
045  private static final int SMALL_MSG_LENGTH = 1 << 20;
046
047  private static final Logger LOG = Logger.getLogger(OperatorTopologyStructImpl.class.getName());
048
049  private final Class<? extends Name<String>> groupName;
050  private final Class<? extends Name<String>> operName;
051  private final String selfId;
052  private final String driverId;
053  private final Sender sender;
054
055  private boolean changes = true;
056  private NodeStruct parent;
057  private final List<NodeStruct> children = new ArrayList<>();
058
059  private final BlockingQueue<NodeStruct> nodesWithData = new LinkedBlockingQueue<>();
060  private final Set<String> childrenToRcvFrom = new HashSet<>();
061
062  private final ConcurrentMap<String, Set<Integer>> deadMsgs = new ConcurrentHashMap<>();
063
064  private final int version;
065
066  public OperatorTopologyStructImpl(final Class<? extends Name<String>> groupName,
067                                    final Class<? extends Name<String>> operName, final String selfId,
068                                    final String driverId, final Sender sender, final int version) {
069    super();
070    this.groupName = groupName;
071    this.operName = operName;
072    this.selfId = selfId;
073    this.driverId = driverId;
074    this.sender = sender;
075    this.version = version;
076  }
077
078  public OperatorTopologyStructImpl(final OperatorTopologyStruct topology) {
079    super();
080    this.groupName = topology.getGroupName();
081    this.operName = topology.getOperName();
082    this.selfId = topology.getSelfId();
083    this.driverId = topology.getDriverId();
084    this.sender = topology.getSender();
085    this.changes = topology.hasChanges();
086    this.parent = topology.getParent();
087    this.children.addAll(topology.getChildren());
088    this.version = topology.getVersion();
089  }
090
091  @Override
092  public String toString() {
093    return "OperatorTopologyStruct - " + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) +
094        "(" + selfId + "," + version + ")";
095  }
096
097  @Override
098  public NodeStruct getParent() {
099    return parent;
100  }
101
102  @Override
103  public Collection<? extends NodeStruct> getChildren() {
104    return children;
105  }
106
107  @Override
108  public Class<? extends Name<String>> getGroupName() {
109    return groupName;
110  }
111
112  @Override
113  public Class<? extends Name<String>> getOperName() {
114    return operName;
115  }
116
117  @Override
118  public String getSelfId() {
119    return selfId;
120  }
121
122  @Override
123  public String getDriverId() {
124    return driverId;
125  }
126
127  @Override
128  public Sender getSender() {
129    return sender;
130  }
131
132  @Override
133  public boolean hasChanges() {
134    LOG.entering("OperatorTopologyStructImpl", "hasChanges", getQualifiedName());
135    LOG.exiting("OperatorTopologyStructImpl", "hasChanges",
136        Arrays.toString(new Object[]{this.changes, getQualifiedName()}));
137    return this.changes;
138  }
139
140  @Override
141  public int getVersion() {
142    return version;
143  }
144
145  @Override
146  public void addAsData(final GroupCommunicationMessage msg) {
147    LOG.entering("OperatorTopologyStructImpl", "addAsData", new Object[]{getQualifiedName(), msg});
148    final String srcId = msg.getSrcid();
149    final NodeStruct node = findNode(srcId);
150    if (node != null) {
151      try {
152        nodesWithData.put(node);
153        LOG.finest(getQualifiedName() + "Added node " + srcId + " to nodesWithData queue");
154      } catch (final InterruptedException e) {
155        throw new RuntimeException("InterruptedException while adding to childrenWithData queue", e);
156      }
157      node.addData(msg);
158    } else {
159      LOG.fine("Unable to find node " + srcId + " to send " + msg.getType() + " to");
160    }
161    LOG.exiting("OperatorTopologyStructImpl", "addAsData", Arrays.toString(new Object[]{getQualifiedName(), msg}));
162  }
163
164  private NodeStruct findNode(final String srcId) {
165    LOG.entering("OperatorTopologyStructImpl", "findNode", new Object[]{getQualifiedName(), srcId});
166    final NodeStruct retVal;
167    if (parent != null && parent.getId().equals(srcId)) {
168      retVal = parent;
169    } else {
170      retVal = findChild(srcId);
171    }
172    LOG.exiting("OperatorTopologyStructImpl", "findNode",
173        Arrays.toString(new Object[]{retVal, getQualifiedName(), srcId}));
174    return retVal;
175  }
176
177  private void sendToNode(final byte[] data,
178                          final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType,
179                          final NodeStruct node) {
180    LOG.entering("OperatorTopologyStructImpl", "sendToNode", new Object[]{getQualifiedName(), msgType, node});
181    final String nodeId = node.getId();
182    try {
183
184      if (data.length > SMALL_MSG_LENGTH) {
185        LOG.finest(getQualifiedName() + "Msg too big. Sending readiness to send " + msgType + " msg to " + nodeId);
186        sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, version, nodeId, node.getVersion(),
187            Utils.EMPTY_BYTE_ARR));
188        final byte[] tmpVal = receiveFromNode(node, true);
189        if (tmpVal != null) {
190          LOG.finest(getQualifiedName() + "Got readiness to accept " + msgType + " msg from " + nodeId
191              + ". Will send actual msg now");
192        } else {
193          LOG.exiting("OperatorTopologyStructImpl", "sendToNode", getQualifiedName());
194          return;
195        }
196      }
197
198      sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, version, nodeId, node.getVersion(),
199          data));
200
201      if (data.length > SMALL_MSG_LENGTH) {
202        LOG.finest(getQualifiedName() + "Msg too big. Will wait for ACK before queing up one more msg");
203        final byte[] tmpVal = receiveFromNode(node, true);
204        if (tmpVal != null) {
205          LOG.finest(getQualifiedName() + "Got " + msgType + " msg received ACK from " + nodeId
206              + ". Will move to next msg if it exists");
207        } else {
208          LOG.exiting("OperatorTopologyStructImpl", "sendToNode", getQualifiedName());
209          return;
210        }
211      }
212    } catch (final NetworkException e) {
213      throw new RuntimeException(
214          "NetworkException while sending " + msgType + " data from " + selfId + " to " + nodeId,
215          e);
216    }
217    LOG.exiting("OperatorTopologyStructImpl", "sendToNode", getQualifiedName());
218  }
219
220  private byte[] receiveFromNode(final NodeStruct node, final boolean remove) {
221    LOG.entering("OperatorTopologyStructImpl", "receiveFromNode", new Object[]{getQualifiedName(), node, remove});
222    final byte[] retVal = node.getData();
223    if (remove) {
224      final boolean removed = nodesWithData.remove(node);
225      final String msg = getQualifiedName() + "Removed(" + removed + ") node " + node.getId()
226          + " from nodesWithData queue";
227      if (removed) {
228        LOG.finest(msg);
229      } else {
230        LOG.fine(msg);
231      }
232    }
233    LOG.exiting("OperatorTopologyStructImpl", "receiveFromNode", getQualifiedName());
234    return retVal;
235  }
236
237  /**
238   * Receive data from {@code node}, while checking if it is trying to send a big message.
239   * Nodes that send big messages will first send an empty data message and
240   * wait for an ACK before transmitting the actual big message. Thus the
241   * receiving side checks whether a message is empty or not, and after sending
242   * an ACK it must wait for another message if the first message was empty.
243   *
244   * @param node node to receive a message from
245   * @param msgType message type
246   * @return message sent from {@code node}
247   */
248  private byte[] recvFromNodeCheckBigMsg(final NodeStruct node,
249                                         final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
250    LOG.entering("OperatorTopologyStructImpl", "recvFromNodeCheckBigMsg", new Object[]{node, msgType});
251
252    byte[] retVal = receiveFromNode(node, false);
253    if (retVal != null && retVal.length == 0) {
254      LOG.finest(getQualifiedName() + " Got msg that node " + node.getId()
255          + " has large data and is ready to send it. Sending ACK to receive data.");
256      sendToNode(Utils.EMPTY_BYTE_ARR, msgType, node);
257      retVal = receiveFromNode(node, true);
258
259      if (retVal != null) {
260        LOG.finest(getQualifiedName() + " Received large msg from node " + node.getId()
261            + ". Will process it after ACKing.");
262        sendToNode(Utils.EMPTY_BYTE_ARR, msgType, node);
263      } else {
264        LOG.warning(getQualifiedName() + "Expected large msg from node " + node.getId()
265            + " but received nothing.");
266      }
267    }
268
269    LOG.exiting("OperatorTopologyStructImpl", "recvFromNodeCheckBigMsg");
270    return retVal;
271  }
272
273  /**
274   * Retrieves and removes the head of {@code nodesWithData}, waiting if necessary until an element becomes available.
275   * (Comment taken from {@link java.util.concurrent.BlockingQueue})
276   * If interrupted while waiting, then throws a RuntimeException.
277   *
278   * @return the head of this queue
279   */
280  private NodeStruct nodesWithDataTakeUnsafe() {
281    LOG.entering("OperatorTopologyStructImpl", "nodesWithDataTakeUnsafe");
282    try {
283      final NodeStruct child = nodesWithData.take();
284      LOG.exiting("OperatorTopologyStructImpl", "nodesWithDataTakeUnsafe", child);
285      return child;
286
287    } catch (final InterruptedException e) {
288      throw new RuntimeException("InterruptedException while waiting to take data from nodesWithData queue", e);
289    }
290  }
291
292  @Override
293  public void sendToParent(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
294    LOG.entering("OperatorTopologyStructImpl", "sendToParent", new Object[]{getQualifiedName(), msgType});
295    if (parent != null) {
296      sendToNode(data, msgType, parent);
297    } else {
298      LOG.fine(getQualifiedName() + "Perhaps parent has died or has not been configured");
299    }
300    LOG.exiting("OperatorTopologyStructImpl", "sendToParent", getQualifiedName());
301  }
302
303  @Override
304  public void sendToChildren(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
305    LOG.entering("OperatorTopologyStructImpl", "sendToChildren", new Object[]{getQualifiedName(), msgType});
306    for (final NodeStruct child : children) {
307      sendToNode(data, msgType, child);
308    }
309    LOG.exiting("OperatorTopologyStructImpl", "sendToChildren", getQualifiedName());
310  }
311
312  @Override
313  public void sendToChildren(final Map<String, byte[]> dataMap,
314                             final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
315    LOG.entering("OperatorTopologyStructImpl", "sendToChildren", new Object[]{getQualifiedName(), msgType});
316    for (final NodeStruct child : children) {
317      if (dataMap.containsKey(child.getId())) {
318        sendToNode(dataMap.get(child.getId()), msgType, child);
319      } else {
320        throw new RuntimeException("No message specified for " + child.getId() + " in dataMap.");
321      }
322    }
323    LOG.exiting("OperatorTopologyStructImpl", "sendToChildren", getQualifiedName());
324  }
325
326  @Override
327  public byte[] recvFromParent(final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
328    LOG.entering("OperatorTopologyStructImpl", "recvFromParent", getQualifiedName());
329    LOG.finest(getQualifiedName() + "Waiting for " + parent.getId() + " to send data");
330    final byte[] retVal = recvFromNodeCheckBigMsg(parent, msgType);
331    LOG.exiting("OperatorTopologyStructImpl", "recvFromParent", getQualifiedName());
332    return retVal;
333  }
334
335  @Override
336  public <T> T recvFromChildren(final ReduceFunction<T> redFunc, final Codec<T> dataCodec) {
337    LOG.entering("OperatorTopologyStructImpl", "recvFromChildren", new Object[]{getQualifiedName(), redFunc,
338        dataCodec});
339    final List<T> retLst = new ArrayList<>(2);
340    for (final NodeStruct child : children) {
341      childrenToRcvFrom.add(child.getId());
342    }
343
344    while (!childrenToRcvFrom.isEmpty()) {
345      LOG.finest(getQualifiedName() + "Waiting for some child to send data");
346      final NodeStruct child = nodesWithDataTakeUnsafe();
347      final byte[] retVal = recvFromNodeCheckBigMsg(child,
348          ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce);
349
350      if (retVal != null) {
351        retLst.add(dataCodec.decode(retVal));
352        if (retLst.size() == 2) {
353          final T redVal = redFunc.apply(retLst);
354          retLst.clear();
355          retLst.add(redVal);
356        }
357      }
358      childrenToRcvFrom.remove(child.getId());
359    }
360    final T retVal = retLst.isEmpty() ? null : retLst.get(0);
361    LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", getQualifiedName());
362    return retVal;
363  }
364
365  /**
366   * Receive data from all children as a single byte array.
367   * Messages from children are simply byte-concatenated.
368   * This method is currently used only by the Gather operator.
369   *
370   * @return gathered data as a byte array
371   */
372  @Override
373  public byte[] recvFromChildren() {
374    LOG.entering("OperatorTopologyStructImpl", "recvFromChildren", getQualifiedName());
375    for (final NodeStruct child : children) {
376      childrenToRcvFrom.add(child.getId());
377    }
378
379    byte[] retVal = new byte[0];
380    while (!childrenToRcvFrom.isEmpty()) {
381      LOG.finest(getQualifiedName() + "Waiting for some child to send data");
382      final NodeStruct child = nodesWithDataTakeUnsafe();
383      final byte[] receivedVal = recvFromNodeCheckBigMsg(child,
384          ReefNetworkGroupCommProtos.GroupCommMessage.Type.Gather);
385
386      if (receivedVal != null) {
387        retVal = ArrayUtils.addAll(retVal, receivedVal);
388      }
389      childrenToRcvFrom.remove(child.getId());
390    }
391
392    LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", getQualifiedName());
393    return retVal;
394  }
395
396  private boolean removedDeadMsg(final String msgSrcId, final int msgSrcVersion) {
397    LOG.entering("OperatorTopologyStructImpl", "removedDeadMsg", new Object[]{getQualifiedName(), msgSrcId,
398        msgSrcVersion});
399    boolean retVal = false;
400    final Set<Integer> msgVersions = deadMsgs.get(msgSrcId);
401    if (msgVersions != null) {
402      LOG.fine(getQualifiedName() + "Found dead msgs " + msgVersions + " waiting for add");
403      if (msgVersions.remove(msgSrcVersion)) {
404        LOG.fine(getQualifiedName() + "Found dead msg with same version as srcVer-" + msgSrcVersion);
405        retVal = true;
406      } else {
407        LOG.finest(getQualifiedName() + "No dead msg with same version as srcVer-" + msgSrcVersion);
408      }
409    } else {
410      LOG.finest(getQualifiedName() + "No dead msgs waiting for add.");
411    }
412    LOG.exiting("OperatorTopologyStructImpl", "removedDeadMsg",
413        new Object[]{retVal, getQualifiedName(), msgSrcId, msgSrcVersion});
414    return retVal;
415  }
416
417  private void addToDeadMsgs(final String srcId, final int srcVersion) {
418    LOG.entering("OperatorTopologyStructImpl", "addToDeadMsgs", new Object[]{getQualifiedName(), srcId, srcVersion});
419    deadMsgs.putIfAbsent(srcId, new HashSet<Integer>());
420    deadMsgs.get(srcId).add(srcVersion);
421    LOG.exiting("OperatorTopologyStructImpl", "addToDeadMsgs", Arrays.toString(new Object[]{getQualifiedName(),
422        srcId, srcVersion}));
423  }
424
425  private boolean addedToDeadMsgs(final NodeStruct node, final String msgSrcId, final int msgSrcVersion) {
426    LOG.entering("OperatorTopologyStructImpl", "addedToDeadMsgs", new Object[]{getQualifiedName(), node, msgSrcId,
427        msgSrcVersion});
428    if (node == null) {
429      LOG.warning(getQualifiedName() + "Got dead msg when no node existed. OOS Queuing up for add to handle");
430      addToDeadMsgs(msgSrcId, msgSrcVersion);
431      LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs",
432          Arrays.toString(new Object[]{true, getQualifiedName(), null, msgSrcId, msgSrcVersion}));
433      return true;
434    }
435    final int nodeVersion = node.getVersion();
436    if (msgSrcVersion > nodeVersion) {
437      LOG.warning(getQualifiedName() + "Got an OOS dead msg. " + "Has HIGHER ver-" + msgSrcVersion + " than node ver-"
438          + nodeVersion + ". Queing up for add to handle");
439      addToDeadMsgs(msgSrcId, msgSrcVersion);
440      LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs",
441          Arrays.toString(new Object[]{true, getQualifiedName(), node, msgSrcId, msgSrcVersion}));
442      return true;
443    }
444    LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs",
445        Arrays.toString(new Object[]{false, getQualifiedName(), node, msgSrcId, msgSrcVersion}));
446    return false;
447  }
448
449  /**
450   * Updates the topology structure with the received
451   * message. Does not make assumptions about msg order
452   * Tries to handle OOS msgs
453   * <p>
454   * Expects only control messages
455   */
456  @Override
457  public void update(final GroupCommunicationMessage msg) {
458    if (msg.hasSrcVersion()) {
459      final String srcId = msg.getSrcid();
460      final int srcVersion = msg.getSrcVersion();
461      LOG.finest(getQualifiedName() + "Updating " + msg.getType() + " msg from " + srcId);
462      LOG.finest(getQualifiedName() + "Before update: parent=" + ((parent != null) ? parent.getId() : "NULL"));
463      LOG.finest(getQualifiedName() + "Before update: children=" + children);
464      switch (msg.getType()) {
465      case ParentAdd:
466        updateParentAdd(srcId, srcVersion);
467        break;
468      case ParentDead:
469        updateParentDead(srcId, srcVersion);
470        break;
471      case ChildAdd:
472        updateChildAdd(srcId, srcVersion);
473        break;
474      case ChildDead:
475        updateChildDead(srcId, srcVersion);
476        break;
477      default:
478        throw new RuntimeException("Received a non control message in update");
479      }
480      LOG.finest(getQualifiedName() + "After update: parent=" + ((parent != null) ? parent.getId() : "NULL"));
481      LOG.finest(getQualifiedName() + "After update: children=" + children);
482    } else {
483      throw new RuntimeException(getQualifiedName() + "can only deal with msgs that have src version set");
484    }
485  }
486
487  private void updateChildDead(final String srcId, final int srcVersion) {
488    LOG.entering("OperatorTopologyStructImpl", "updateChildDead",
489        new Object[]{getQualifiedName(), srcId, srcVersion});
490    final NodeStruct toBeRemovedchild = findChild(srcId);
491    if (!addedToDeadMsgs(toBeRemovedchild, srcId, srcVersion)) {
492      final int childVersion = toBeRemovedchild.getVersion();
493      if (srcVersion < childVersion) {
494        LOG.finest(getQualifiedName() + "Got an OOS child dead msg. " + "Has LOWER ver-" + srcVersion
495            + " than child ver-" + childVersion + ". Discarding");
496        LOG.exiting("OperatorTopologyStructImpl", "updateChildDead", Arrays.toString(new Object[]{getQualifiedName(),
497            srcId, srcVersion}));
498        return;
499      } else {
500        LOG.finest(getQualifiedName() + "Got a child dead msg. " + "Has SAME ver-" + srcVersion + " as child ver-"
501            + childVersion + "Removing child node");
502      }
503    } else {
504      LOG.fine(getQualifiedName() + "Added to dead msgs. Removing child node since ChildAdd might not turn up");
505    }
506    children.remove(toBeRemovedchild);
507    LOG.exiting("OperatorTopologyStructImpl", "updateChildDead", Arrays.toString(new Object[]{getQualifiedName(),
508        srcId, srcVersion}));
509  }
510
511  private void updateChildAdd(final String srcId, final int srcVersion) {
512    LOG.entering("OperatorTopologyStructImpl", "updateChildAdd", new Object[]{getQualifiedName(), srcId, srcVersion});
513    if (!removedDeadMsg(srcId, srcVersion)) {
514      final NodeStruct toBeAddedchild = findChild(srcId);
515      if (toBeAddedchild != null) {
516        LOG.warning(getQualifiedName() + "Child already exists");
517        final int childVersion = toBeAddedchild.getVersion();
518        if (srcVersion < childVersion) {
519          LOG.fine(getQualifiedName() + "Got an OOS child add msg. " + "Has LOWER ver-" + srcVersion
520              + " than child ver-" + childVersion + ". Discarding");
521          LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd",
522              Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion}));
523          return;
524        }
525        if (srcVersion > childVersion) {
526          LOG.fine(getQualifiedName() + "Got an OOS child add msg. " + "Has HIGHER ver-" + srcVersion
527              + " than child ver-" + childVersion + ". Bumping up version number");
528          toBeAddedchild.setVersion(srcVersion);
529          LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd",
530              Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion}));
531          return;
532        } else {
533          throw new RuntimeException(getQualifiedName() + "Got two child add msgs of same version-" + srcVersion);
534        }
535      } else {
536        LOG.finest(getQualifiedName() + "Creating new child node for " + srcId);
537        children.add(new ChildNodeStruct(srcId, srcVersion));
538      }
539    } else {
540      LOG.warning(getQualifiedName() + "Removed dead msg. Not adding child");
541    }
542    LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd", Arrays.toString(new Object[]{getQualifiedName(),
543        srcId, srcVersion}));
544  }
545
546  private void updateParentDead(final String srcId, final int srcVersion) {
547    LOG.entering("OperatorTopologyStructImpl", "updateParentDead",
548        new Object[]{getQualifiedName(), srcId, srcVersion});
549    if (!addedToDeadMsgs(parent, srcId, srcVersion)) {
550      final int parentVersion = parent.getVersion();
551      if (srcVersion < parentVersion) {
552        LOG.fine(getQualifiedName() + "Got an OOS parent dead msg. " + "Has LOWER ver-" + srcVersion
553            + " than parent ver-" + parentVersion + ". Discarding");
554        LOG.exiting("OperatorTopologyStructImpl", "updateParentDead",
555            Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion}));
556        return;
557      } else {
558        LOG.finest(getQualifiedName() + "Got a parent dead msg. " + "Has SAME ver-" + srcVersion + " as parent ver-"
559            + parentVersion + "Setting parent node to null");
560      }
561    } else {
562      LOG.warning(getQualifiedName() + "Added to dead msgs. Setting parent to null since ParentAdd might not turn up");
563    }
564    parent = null;
565    LOG.exiting("OperatorTopologyStructImpl", "updateParentDead", Arrays.toString(new Object[]{getQualifiedName(),
566        srcId, srcVersion}));
567  }
568
569  private void updateParentAdd(final String srcId, final int srcVersion) {
570    LOG.entering("OperatorTopologyStructImpl", "updateParentAdd",
571        new Object[]{getQualifiedName(), srcId, srcVersion});
572    if (!removedDeadMsg(srcId, srcVersion)) {
573      if (parent != null) {
574        LOG.fine(getQualifiedName() + "Parent already exists");
575        final int parentVersion = parent.getVersion();
576        if (srcVersion < parentVersion) {
577          LOG.fine(getQualifiedName() + "Got an OOS parent add msg. " + "Has LOWER ver-" + srcVersion
578              + " than parent ver-" + parentVersion + ". Discarding");
579          LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd",
580              Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion}));
581          return;
582        }
583        if (srcVersion > parentVersion) {
584          LOG.fine(getQualifiedName() + "Got an OOS parent add msg. " + "Has HIGHER ver-" + srcVersion
585              + " than parent ver-" + parentVersion + ". Bumping up version number");
586          parent.setVersion(srcVersion);
587          LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd",
588              Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion}));
589          return;
590        } else {
591          throw new RuntimeException(getQualifiedName() + "Got two parent add msgs of same version-" + srcVersion);
592        }
593      } else {
594        LOG.finest(getQualifiedName() + "Creating new parent node for " + srcId);
595        parent = new ParentNodeStruct(srcId, srcVersion);
596      }
597    } else {
598      LOG.fine(getQualifiedName() + "Removed dead msg. Not adding parent");
599    }
600    LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd", Arrays.toString(new Object[]{getQualifiedName(),
601        srcId, srcVersion}));
602  }
603
604  /**
605   * @param srcId
606   * @return
607   */
608  private NodeStruct findChild(final String srcId) {
609    LOG.entering("OperatorTopologyStructImpl", "findChild", new Object[]{getQualifiedName(), srcId});
610    NodeStruct retVal = null;
611    for (final NodeStruct node : children) {
612      if (node.getId().equals(srcId)) {
613        retVal = node;
614        break;
615      }
616    }
617    LOG.exiting("OperatorTopologyStructImpl", "findChild", Arrays.toString(new Object[]{retVal, getQualifiedName(),
618        srcId}));
619    return retVal;
620  }
621
622  @Override
623  public void update(final Set<GroupCommunicationMessage> deletionDeltas) {
624    LOG.entering("OperatorTopologyStructImpl", "update", new Object[]{"Updating topology with deleting msgs",
625        getQualifiedName(), deletionDeltas});
626    for (final GroupCommunicationMessage delDelta : deletionDeltas) {
627      update(delDelta);
628    }
629    LOG.exiting("OperatorTopologyStructImpl", "update", Arrays.toString(new Object[]{getQualifiedName(),
630        deletionDeltas}));
631  }
632
633  @Override
634  public void setChanges(final boolean changes) {
635    LOG.entering("OperatorTopologyStructImpl", "setChanges", new Object[]{getQualifiedName(), changes});
636    this.changes = changes;
637    LOG.exiting("OperatorTopologyStructImpl", "setChanges",
638        Arrays.toString(new Object[]{getQualifiedName(), changes}));
639  }
640
641  private String getQualifiedName() {
642    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + selfId + ":ver(" + version + ") - ";
643  }
644}