001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.task; 020 021import org.apache.commons.lang.ArrayUtils; 022import org.apache.reef.exception.evaluator.NetworkException; 023import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; 024import org.apache.reef.io.network.group.api.task.NodeStruct; 025import org.apache.reef.io.network.group.api.task.OperatorTopologyStruct; 026import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 027import org.apache.reef.io.network.group.impl.operators.Sender; 028import org.apache.reef.io.network.group.impl.utils.Utils; 029import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 030import org.apache.reef.io.serialization.Codec; 031import org.apache.reef.tang.annotations.Name; 032 033import java.util.*; 034import java.util.concurrent.BlockingQueue; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.concurrent.ConcurrentMap; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.logging.Logger; 039 040/** 041 * 042 */ 043public class OperatorTopologyStructImpl implements OperatorTopologyStruct { 044 045 private static final int SMALL_MSG_LENGTH = 1 << 20; 046 047 private static final Logger LOG = Logger.getLogger(OperatorTopologyStructImpl.class.getName()); 048 049 private final Class<? extends Name<String>> groupName; 050 private final Class<? extends Name<String>> operName; 051 private final String selfId; 052 private final String driverId; 053 private final Sender sender; 054 055 private boolean changes = true; 056 private NodeStruct parent; 057 private final List<NodeStruct> children = new ArrayList<>(); 058 059 private final BlockingQueue<NodeStruct> nodesWithData = new LinkedBlockingQueue<>(); 060 private final Set<String> childrenToRcvFrom = new HashSet<>(); 061 062 private final ConcurrentMap<String, Set<Integer>> deadMsgs = new ConcurrentHashMap<>(); 063 064 private final int version; 065 066 public OperatorTopologyStructImpl(final Class<? extends Name<String>> groupName, 067 final Class<? extends Name<String>> operName, final String selfId, 068 final String driverId, final Sender sender, final int version) { 069 super(); 070 this.groupName = groupName; 071 this.operName = operName; 072 this.selfId = selfId; 073 this.driverId = driverId; 074 this.sender = sender; 075 this.version = version; 076 } 077 078 public OperatorTopologyStructImpl(final OperatorTopologyStruct topology) { 079 super(); 080 this.groupName = topology.getGroupName(); 081 this.operName = topology.getOperName(); 082 this.selfId = topology.getSelfId(); 083 this.driverId = topology.getDriverId(); 084 this.sender = topology.getSender(); 085 this.changes = topology.hasChanges(); 086 this.parent = topology.getParent(); 087 this.children.addAll(topology.getChildren()); 088 this.version = topology.getVersion(); 089 } 090 091 @Override 092 public String toString() { 093 return "OperatorTopologyStruct - " + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + 094 "(" + selfId + "," + version + ")"; 095 } 096 097 @Override 098 public NodeStruct getParent() { 099 return parent; 100 } 101 102 @Override 103 public Collection<? extends NodeStruct> getChildren() { 104 return children; 105 } 106 107 @Override 108 public Class<? extends Name<String>> getGroupName() { 109 return groupName; 110 } 111 112 @Override 113 public Class<? extends Name<String>> getOperName() { 114 return operName; 115 } 116 117 @Override 118 public String getSelfId() { 119 return selfId; 120 } 121 122 @Override 123 public String getDriverId() { 124 return driverId; 125 } 126 127 @Override 128 public Sender getSender() { 129 return sender; 130 } 131 132 @Override 133 public boolean hasChanges() { 134 LOG.entering("OperatorTopologyStructImpl", "hasChanges", getQualifiedName()); 135 LOG.exiting("OperatorTopologyStructImpl", "hasChanges", 136 Arrays.toString(new Object[]{this.changes, getQualifiedName()})); 137 return this.changes; 138 } 139 140 @Override 141 public int getVersion() { 142 return version; 143 } 144 145 @Override 146 public void addAsData(final GroupCommunicationMessage msg) { 147 LOG.entering("OperatorTopologyStructImpl", "addAsData", new Object[]{getQualifiedName(), msg}); 148 final String srcId = msg.getSrcid(); 149 final NodeStruct node = findNode(srcId); 150 if (node != null) { 151 try { 152 nodesWithData.put(node); 153 LOG.finest(getQualifiedName() + "Added node " + srcId + " to nodesWithData queue"); 154 } catch (final InterruptedException e) { 155 throw new RuntimeException("InterruptedException while adding to childrenWithData queue", e); 156 } 157 node.addData(msg); 158 } else { 159 LOG.fine("Unable to find node " + srcId + " to send " + msg.getType() + " to"); 160 } 161 LOG.exiting("OperatorTopologyStructImpl", "addAsData", Arrays.toString(new Object[]{getQualifiedName(), msg})); 162 } 163 164 private NodeStruct findNode(final String srcId) { 165 LOG.entering("OperatorTopologyStructImpl", "findNode", new Object[]{getQualifiedName(), srcId}); 166 final NodeStruct retVal; 167 if (parent != null && parent.getId().equals(srcId)) { 168 retVal = parent; 169 } else { 170 retVal = findChild(srcId); 171 } 172 LOG.exiting("OperatorTopologyStructImpl", "findNode", 173 Arrays.toString(new Object[]{retVal, getQualifiedName(), srcId})); 174 return retVal; 175 } 176 177 private void sendToNode(final byte[] data, 178 final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, 179 final NodeStruct node) { 180 LOG.entering("OperatorTopologyStructImpl", "sendToNode", new Object[]{getQualifiedName(), msgType, node}); 181 final String nodeId = node.getId(); 182 try { 183 184 if (data.length > SMALL_MSG_LENGTH) { 185 LOG.finest(getQualifiedName() + "Msg too big. Sending readiness to send " + msgType + " msg to " + nodeId); 186 sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, version, nodeId, node.getVersion(), 187 Utils.EMPTY_BYTE_ARR)); 188 final byte[] tmpVal = receiveFromNode(node, true); 189 if (tmpVal != null) { 190 LOG.finest(getQualifiedName() + "Got readiness to accept " + msgType + " msg from " + nodeId 191 + ". Will send actual msg now"); 192 } else { 193 LOG.exiting("OperatorTopologyStructImpl", "sendToNode", getQualifiedName()); 194 return; 195 } 196 } 197 198 sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, version, nodeId, node.getVersion(), 199 data)); 200 201 if (data.length > SMALL_MSG_LENGTH) { 202 LOG.finest(getQualifiedName() + "Msg too big. Will wait for ACK before queing up one more msg"); 203 final byte[] tmpVal = receiveFromNode(node, true); 204 if (tmpVal != null) { 205 LOG.finest(getQualifiedName() + "Got " + msgType + " msg received ACK from " + nodeId 206 + ". Will move to next msg if it exists"); 207 } else { 208 LOG.exiting("OperatorTopologyStructImpl", "sendToNode", getQualifiedName()); 209 return; 210 } 211 } 212 } catch (final NetworkException e) { 213 throw new RuntimeException( 214 "NetworkException while sending " + msgType + " data from " + selfId + " to " + nodeId, 215 e); 216 } 217 LOG.exiting("OperatorTopologyStructImpl", "sendToNode", getQualifiedName()); 218 } 219 220 private byte[] receiveFromNode(final NodeStruct node, final boolean remove) { 221 LOG.entering("OperatorTopologyStructImpl", "receiveFromNode", new Object[]{getQualifiedName(), node, remove}); 222 final byte[] retVal = node.getData(); 223 if (remove) { 224 final boolean removed = nodesWithData.remove(node); 225 final String msg = getQualifiedName() + "Removed(" + removed + ") node " + node.getId() 226 + " from nodesWithData queue"; 227 if (removed) { 228 LOG.finest(msg); 229 } else { 230 LOG.fine(msg); 231 } 232 } 233 LOG.exiting("OperatorTopologyStructImpl", "receiveFromNode", getQualifiedName()); 234 return retVal; 235 } 236 237 /** 238 * Receive data from {@code node}, while checking if it is trying to send a big message. 239 * Nodes that send big messages will first send an empty data message and 240 * wait for an ACK before transmitting the actual big message. Thus the 241 * receiving side checks whether a message is empty or not, and after sending 242 * an ACK it must wait for another message if the first message was empty. 243 * 244 * @param node node to receive a message from 245 * @param msgType message type 246 * @return message sent from {@code node} 247 */ 248 private byte[] recvFromNodeCheckBigMsg(final NodeStruct node, 249 final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { 250 LOG.entering("OperatorTopologyStructImpl", "recvFromNodeCheckBigMsg", new Object[]{node, msgType}); 251 252 byte[] retVal = receiveFromNode(node, false); 253 if (retVal != null && retVal.length == 0) { 254 LOG.finest(getQualifiedName() + " Got msg that node " + node.getId() 255 + " has large data and is ready to send it. Sending ACK to receive data."); 256 sendToNode(Utils.EMPTY_BYTE_ARR, msgType, node); 257 retVal = receiveFromNode(node, true); 258 259 if (retVal != null) { 260 LOG.finest(getQualifiedName() + " Received large msg from node " + node.getId() 261 + ". Will process it after ACKing."); 262 sendToNode(Utils.EMPTY_BYTE_ARR, msgType, node); 263 } else { 264 LOG.warning(getQualifiedName() + "Expected large msg from node " + node.getId() 265 + " but received nothing."); 266 } 267 } 268 269 LOG.exiting("OperatorTopologyStructImpl", "recvFromNodeCheckBigMsg"); 270 return retVal; 271 } 272 273 /** 274 * Retrieves and removes the head of {@code nodesWithData}, waiting if necessary until an element becomes available. 275 * (Comment taken from {@link java.util.concurrent.BlockingQueue}) 276 * If interrupted while waiting, then throws a RuntimeException. 277 * 278 * @return the head of this queue 279 */ 280 private NodeStruct nodesWithDataTakeUnsafe() { 281 LOG.entering("OperatorTopologyStructImpl", "nodesWithDataTakeUnsafe"); 282 try { 283 final NodeStruct child = nodesWithData.take(); 284 LOG.exiting("OperatorTopologyStructImpl", "nodesWithDataTakeUnsafe", child); 285 return child; 286 287 } catch (final InterruptedException e) { 288 throw new RuntimeException("InterruptedException while waiting to take data from nodesWithData queue", e); 289 } 290 } 291 292 @Override 293 public void sendToParent(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { 294 LOG.entering("OperatorTopologyStructImpl", "sendToParent", new Object[]{getQualifiedName(), msgType}); 295 if (parent != null) { 296 sendToNode(data, msgType, parent); 297 } else { 298 LOG.fine(getQualifiedName() + "Perhaps parent has died or has not been configured"); 299 } 300 LOG.exiting("OperatorTopologyStructImpl", "sendToParent", getQualifiedName()); 301 } 302 303 @Override 304 public void sendToChildren(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { 305 LOG.entering("OperatorTopologyStructImpl", "sendToChildren", new Object[]{getQualifiedName(), msgType}); 306 for (final NodeStruct child : children) { 307 sendToNode(data, msgType, child); 308 } 309 LOG.exiting("OperatorTopologyStructImpl", "sendToChildren", getQualifiedName()); 310 } 311 312 @Override 313 public void sendToChildren(final Map<String, byte[]> dataMap, 314 final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { 315 LOG.entering("OperatorTopologyStructImpl", "sendToChildren", new Object[]{getQualifiedName(), msgType}); 316 for (final NodeStruct child : children) { 317 if (dataMap.containsKey(child.getId())) { 318 sendToNode(dataMap.get(child.getId()), msgType, child); 319 } else { 320 throw new RuntimeException("No message specified for " + child.getId() + " in dataMap."); 321 } 322 } 323 LOG.exiting("OperatorTopologyStructImpl", "sendToChildren", getQualifiedName()); 324 } 325 326 @Override 327 public byte[] recvFromParent(final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { 328 LOG.entering("OperatorTopologyStructImpl", "recvFromParent", getQualifiedName()); 329 LOG.finest(getQualifiedName() + "Waiting for " + parent.getId() + " to send data"); 330 final byte[] retVal = recvFromNodeCheckBigMsg(parent, msgType); 331 LOG.exiting("OperatorTopologyStructImpl", "recvFromParent", getQualifiedName()); 332 return retVal; 333 } 334 335 @Override 336 public <T> T recvFromChildren(final ReduceFunction<T> redFunc, final Codec<T> dataCodec) { 337 LOG.entering("OperatorTopologyStructImpl", "recvFromChildren", new Object[]{getQualifiedName(), redFunc, 338 dataCodec}); 339 final List<T> retLst = new ArrayList<>(2); 340 for (final NodeStruct child : children) { 341 childrenToRcvFrom.add(child.getId()); 342 } 343 344 while (!childrenToRcvFrom.isEmpty()) { 345 LOG.finest(getQualifiedName() + "Waiting for some child to send data"); 346 final NodeStruct child = nodesWithDataTakeUnsafe(); 347 final byte[] retVal = recvFromNodeCheckBigMsg(child, 348 ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce); 349 350 if (retVal != null) { 351 retLst.add(dataCodec.decode(retVal)); 352 if (retLst.size() == 2) { 353 final T redVal = redFunc.apply(retLst); 354 retLst.clear(); 355 retLst.add(redVal); 356 } 357 } 358 childrenToRcvFrom.remove(child.getId()); 359 } 360 final T retVal = retLst.isEmpty() ? null : retLst.get(0); 361 LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", getQualifiedName()); 362 return retVal; 363 } 364 365 /** 366 * Receive data from all children as a single byte array. 367 * Messages from children are simply byte-concatenated. 368 * This method is currently used only by the Gather operator. 369 * 370 * @return gathered data as a byte array 371 */ 372 @Override 373 public byte[] recvFromChildren() { 374 LOG.entering("OperatorTopologyStructImpl", "recvFromChildren", getQualifiedName()); 375 for (final NodeStruct child : children) { 376 childrenToRcvFrom.add(child.getId()); 377 } 378 379 byte[] retVal = new byte[0]; 380 while (!childrenToRcvFrom.isEmpty()) { 381 LOG.finest(getQualifiedName() + "Waiting for some child to send data"); 382 final NodeStruct child = nodesWithDataTakeUnsafe(); 383 final byte[] receivedVal = recvFromNodeCheckBigMsg(child, 384 ReefNetworkGroupCommProtos.GroupCommMessage.Type.Gather); 385 386 if (receivedVal != null) { 387 retVal = ArrayUtils.addAll(retVal, receivedVal); 388 } 389 childrenToRcvFrom.remove(child.getId()); 390 } 391 392 LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", getQualifiedName()); 393 return retVal; 394 } 395 396 private boolean removedDeadMsg(final String msgSrcId, final int msgSrcVersion) { 397 LOG.entering("OperatorTopologyStructImpl", "removedDeadMsg", new Object[]{getQualifiedName(), msgSrcId, 398 msgSrcVersion}); 399 boolean retVal = false; 400 final Set<Integer> msgVersions = deadMsgs.get(msgSrcId); 401 if (msgVersions != null) { 402 LOG.fine(getQualifiedName() + "Found dead msgs " + msgVersions + " waiting for add"); 403 if (msgVersions.remove(msgSrcVersion)) { 404 LOG.fine(getQualifiedName() + "Found dead msg with same version as srcVer-" + msgSrcVersion); 405 retVal = true; 406 } else { 407 LOG.finest(getQualifiedName() + "No dead msg with same version as srcVer-" + msgSrcVersion); 408 } 409 } else { 410 LOG.finest(getQualifiedName() + "No dead msgs waiting for add."); 411 } 412 LOG.exiting("OperatorTopologyStructImpl", "removedDeadMsg", 413 new Object[]{retVal, getQualifiedName(), msgSrcId, msgSrcVersion}); 414 return retVal; 415 } 416 417 private void addToDeadMsgs(final String srcId, final int srcVersion) { 418 LOG.entering("OperatorTopologyStructImpl", "addToDeadMsgs", new Object[]{getQualifiedName(), srcId, srcVersion}); 419 deadMsgs.putIfAbsent(srcId, new HashSet<Integer>()); 420 deadMsgs.get(srcId).add(srcVersion); 421 LOG.exiting("OperatorTopologyStructImpl", "addToDeadMsgs", Arrays.toString(new Object[]{getQualifiedName(), 422 srcId, srcVersion})); 423 } 424 425 private boolean addedToDeadMsgs(final NodeStruct node, final String msgSrcId, final int msgSrcVersion) { 426 LOG.entering("OperatorTopologyStructImpl", "addedToDeadMsgs", new Object[]{getQualifiedName(), node, msgSrcId, 427 msgSrcVersion}); 428 if (node == null) { 429 LOG.warning(getQualifiedName() + "Got dead msg when no node existed. OOS Queuing up for add to handle"); 430 addToDeadMsgs(msgSrcId, msgSrcVersion); 431 LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", 432 Arrays.toString(new Object[]{true, getQualifiedName(), null, msgSrcId, msgSrcVersion})); 433 return true; 434 } 435 final int nodeVersion = node.getVersion(); 436 if (msgSrcVersion > nodeVersion) { 437 LOG.warning(getQualifiedName() + "Got an OOS dead msg. " + "Has HIGHER ver-" + msgSrcVersion + " than node ver-" 438 + nodeVersion + ". Queing up for add to handle"); 439 addToDeadMsgs(msgSrcId, msgSrcVersion); 440 LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", 441 Arrays.toString(new Object[]{true, getQualifiedName(), node, msgSrcId, msgSrcVersion})); 442 return true; 443 } 444 LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", 445 Arrays.toString(new Object[]{false, getQualifiedName(), node, msgSrcId, msgSrcVersion})); 446 return false; 447 } 448 449 /** 450 * Updates the topology structure with the received 451 * message. Does not make assumptions about msg order 452 * Tries to handle OOS msgs 453 * <p> 454 * Expects only control messages 455 */ 456 @Override 457 public void update(final GroupCommunicationMessage msg) { 458 if (msg.hasSrcVersion()) { 459 final String srcId = msg.getSrcid(); 460 final int srcVersion = msg.getSrcVersion(); 461 LOG.finest(getQualifiedName() + "Updating " + msg.getType() + " msg from " + srcId); 462 LOG.finest(getQualifiedName() + "Before update: parent=" + ((parent != null) ? parent.getId() : "NULL")); 463 LOG.finest(getQualifiedName() + "Before update: children=" + children); 464 switch (msg.getType()) { 465 case ParentAdd: 466 updateParentAdd(srcId, srcVersion); 467 break; 468 case ParentDead: 469 updateParentDead(srcId, srcVersion); 470 break; 471 case ChildAdd: 472 updateChildAdd(srcId, srcVersion); 473 break; 474 case ChildDead: 475 updateChildDead(srcId, srcVersion); 476 break; 477 default: 478 throw new RuntimeException("Received a non control message in update"); 479 } 480 LOG.finest(getQualifiedName() + "After update: parent=" + ((parent != null) ? parent.getId() : "NULL")); 481 LOG.finest(getQualifiedName() + "After update: children=" + children); 482 } else { 483 throw new RuntimeException(getQualifiedName() + "can only deal with msgs that have src version set"); 484 } 485 } 486 487 private void updateChildDead(final String srcId, final int srcVersion) { 488 LOG.entering("OperatorTopologyStructImpl", "updateChildDead", 489 new Object[]{getQualifiedName(), srcId, srcVersion}); 490 final NodeStruct toBeRemovedchild = findChild(srcId); 491 if (!addedToDeadMsgs(toBeRemovedchild, srcId, srcVersion)) { 492 final int childVersion = toBeRemovedchild.getVersion(); 493 if (srcVersion < childVersion) { 494 LOG.finest(getQualifiedName() + "Got an OOS child dead msg. " + "Has LOWER ver-" + srcVersion 495 + " than child ver-" + childVersion + ". Discarding"); 496 LOG.exiting("OperatorTopologyStructImpl", "updateChildDead", Arrays.toString(new Object[]{getQualifiedName(), 497 srcId, srcVersion})); 498 return; 499 } else { 500 LOG.finest(getQualifiedName() + "Got a child dead msg. " + "Has SAME ver-" + srcVersion + " as child ver-" 501 + childVersion + "Removing child node"); 502 } 503 } else { 504 LOG.fine(getQualifiedName() + "Added to dead msgs. Removing child node since ChildAdd might not turn up"); 505 } 506 children.remove(toBeRemovedchild); 507 LOG.exiting("OperatorTopologyStructImpl", "updateChildDead", Arrays.toString(new Object[]{getQualifiedName(), 508 srcId, srcVersion})); 509 } 510 511 private void updateChildAdd(final String srcId, final int srcVersion) { 512 LOG.entering("OperatorTopologyStructImpl", "updateChildAdd", new Object[]{getQualifiedName(), srcId, srcVersion}); 513 if (!removedDeadMsg(srcId, srcVersion)) { 514 final NodeStruct toBeAddedchild = findChild(srcId); 515 if (toBeAddedchild != null) { 516 LOG.warning(getQualifiedName() + "Child already exists"); 517 final int childVersion = toBeAddedchild.getVersion(); 518 if (srcVersion < childVersion) { 519 LOG.fine(getQualifiedName() + "Got an OOS child add msg. " + "Has LOWER ver-" + srcVersion 520 + " than child ver-" + childVersion + ". Discarding"); 521 LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd", 522 Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); 523 return; 524 } 525 if (srcVersion > childVersion) { 526 LOG.fine(getQualifiedName() + "Got an OOS child add msg. " + "Has HIGHER ver-" + srcVersion 527 + " than child ver-" + childVersion + ". Bumping up version number"); 528 toBeAddedchild.setVersion(srcVersion); 529 LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd", 530 Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); 531 return; 532 } else { 533 throw new RuntimeException(getQualifiedName() + "Got two child add msgs of same version-" + srcVersion); 534 } 535 } else { 536 LOG.finest(getQualifiedName() + "Creating new child node for " + srcId); 537 children.add(new ChildNodeStruct(srcId, srcVersion)); 538 } 539 } else { 540 LOG.warning(getQualifiedName() + "Removed dead msg. Not adding child"); 541 } 542 LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd", Arrays.toString(new Object[]{getQualifiedName(), 543 srcId, srcVersion})); 544 } 545 546 private void updateParentDead(final String srcId, final int srcVersion) { 547 LOG.entering("OperatorTopologyStructImpl", "updateParentDead", 548 new Object[]{getQualifiedName(), srcId, srcVersion}); 549 if (!addedToDeadMsgs(parent, srcId, srcVersion)) { 550 final int parentVersion = parent.getVersion(); 551 if (srcVersion < parentVersion) { 552 LOG.fine(getQualifiedName() + "Got an OOS parent dead msg. " + "Has LOWER ver-" + srcVersion 553 + " than parent ver-" + parentVersion + ". Discarding"); 554 LOG.exiting("OperatorTopologyStructImpl", "updateParentDead", 555 Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); 556 return; 557 } else { 558 LOG.finest(getQualifiedName() + "Got a parent dead msg. " + "Has SAME ver-" + srcVersion + " as parent ver-" 559 + parentVersion + "Setting parent node to null"); 560 } 561 } else { 562 LOG.warning(getQualifiedName() + "Added to dead msgs. Setting parent to null since ParentAdd might not turn up"); 563 } 564 parent = null; 565 LOG.exiting("OperatorTopologyStructImpl", "updateParentDead", Arrays.toString(new Object[]{getQualifiedName(), 566 srcId, srcVersion})); 567 } 568 569 private void updateParentAdd(final String srcId, final int srcVersion) { 570 LOG.entering("OperatorTopologyStructImpl", "updateParentAdd", 571 new Object[]{getQualifiedName(), srcId, srcVersion}); 572 if (!removedDeadMsg(srcId, srcVersion)) { 573 if (parent != null) { 574 LOG.fine(getQualifiedName() + "Parent already exists"); 575 final int parentVersion = parent.getVersion(); 576 if (srcVersion < parentVersion) { 577 LOG.fine(getQualifiedName() + "Got an OOS parent add msg. " + "Has LOWER ver-" + srcVersion 578 + " than parent ver-" + parentVersion + ". Discarding"); 579 LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd", 580 Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); 581 return; 582 } 583 if (srcVersion > parentVersion) { 584 LOG.fine(getQualifiedName() + "Got an OOS parent add msg. " + "Has HIGHER ver-" + srcVersion 585 + " than parent ver-" + parentVersion + ". Bumping up version number"); 586 parent.setVersion(srcVersion); 587 LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd", 588 Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); 589 return; 590 } else { 591 throw new RuntimeException(getQualifiedName() + "Got two parent add msgs of same version-" + srcVersion); 592 } 593 } else { 594 LOG.finest(getQualifiedName() + "Creating new parent node for " + srcId); 595 parent = new ParentNodeStruct(srcId, srcVersion); 596 } 597 } else { 598 LOG.fine(getQualifiedName() + "Removed dead msg. Not adding parent"); 599 } 600 LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd", Arrays.toString(new Object[]{getQualifiedName(), 601 srcId, srcVersion})); 602 } 603 604 /** 605 * @param srcId 606 * @return 607 */ 608 private NodeStruct findChild(final String srcId) { 609 LOG.entering("OperatorTopologyStructImpl", "findChild", new Object[]{getQualifiedName(), srcId}); 610 NodeStruct retVal = null; 611 for (final NodeStruct node : children) { 612 if (node.getId().equals(srcId)) { 613 retVal = node; 614 break; 615 } 616 } 617 LOG.exiting("OperatorTopologyStructImpl", "findChild", Arrays.toString(new Object[]{retVal, getQualifiedName(), 618 srcId})); 619 return retVal; 620 } 621 622 @Override 623 public void update(final Set<GroupCommunicationMessage> deletionDeltas) { 624 LOG.entering("OperatorTopologyStructImpl", "update", new Object[]{"Updating topology with deleting msgs", 625 getQualifiedName(), deletionDeltas}); 626 for (final GroupCommunicationMessage delDelta : deletionDeltas) { 627 update(delDelta); 628 } 629 LOG.exiting("OperatorTopologyStructImpl", "update", Arrays.toString(new Object[]{getQualifiedName(), 630 deletionDeltas})); 631 } 632 633 @Override 634 public void setChanges(final boolean changes) { 635 LOG.entering("OperatorTopologyStructImpl", "setChanges", new Object[]{getQualifiedName(), changes}); 636 this.changes = changes; 637 LOG.exiting("OperatorTopologyStructImpl", "setChanges", 638 Arrays.toString(new Object[]{getQualifiedName(), changes})); 639 } 640 641 private String getQualifiedName() { 642 return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + selfId + ":ver(" + version + ") - "; 643 } 644}