001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.io.network.group.api.driver.TaskNode; 022import org.apache.reef.io.network.group.api.driver.TaskNodeStatus; 023import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 024import org.apache.reef.io.network.group.impl.utils.Utils; 025import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 026import org.apache.reef.tang.annotations.Name; 027import org.apache.reef.wake.EStage; 028 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.logging.Logger; 035 036public class TaskNodeImpl implements TaskNode { 037 038 private static final Logger LOG = Logger.getLogger(TaskNodeImpl.class.getName()); 039 040 private final EStage<GroupCommunicationMessage> senderStage; 041 private final Class<? extends Name<String>> groupName; 042 private final Class<? extends Name<String>> operName; 043 private final String taskId; 044 private final String driverId; 045 046 private final boolean isRoot; 047 private TaskNode parent; 048 private TaskNode sibling; 049 private final List<TaskNode> children = new ArrayList<>(); 050 051 private final AtomicBoolean running = new AtomicBoolean(false); 052 private final AtomicBoolean topoSetupSent = new AtomicBoolean(false); 053 054 private final TaskNodeStatus taskNodeStatus; 055 056 private final AtomicInteger version = new AtomicInteger(0); 057 058 public TaskNodeImpl(final EStage<GroupCommunicationMessage> senderStage, 059 final Class<? extends Name<String>> groupName, 060 final Class<? extends Name<String>> operatorName, 061 final String taskId, final String driverId, final boolean isRoot) { 062 this.senderStage = senderStage; 063 this.groupName = groupName; 064 this.operName = operatorName; 065 this.taskId = taskId; 066 this.driverId = driverId; 067 this.isRoot = isRoot; 068 taskNodeStatus = new TaskNodeStatusImpl(groupName, operatorName, taskId, this); 069 } 070 071 @Override 072 public void setSibling(final TaskNode leaf) { 073 LOG.entering("TaskNodeImpl", "setSibling", new Object[]{getQualifiedName(), leaf}); 074 sibling = leaf; 075 LOG.exiting("TaskNodeImpl", "setSibling", getQualifiedName()); 076 } 077 078 @Override 079 public int getNumberOfChildren() { 080 LOG.entering("TaskNodeImpl", "getNumberOfChildren", getQualifiedName()); 081 final int size = children.size(); 082 LOG.exiting("TaskNodeImpl", "getNumberOfChildren", getQualifiedName() + size); 083 return size; 084 } 085 086 @Override 087 public TaskNode successor() { 088 LOG.entering("TaskNodeImpl", "successor", getQualifiedName()); 089 LOG.exiting("TaskNodeImpl", "successor", getQualifiedName() + sibling); 090 return sibling; 091 } 092 093 @Override 094 public String toString() { 095 return "(" + taskId + "," + version.get() + ")"; 096 } 097 098 /** 099 * * Methods pertaining to my status change ***. 100 */ 101 @Override 102 public void onFailedTask() { 103 LOG.entering("TaskNodeImpl", "onFailedTask", getQualifiedName()); 104 if (!running.compareAndSet(true, false)) { 105 LOG.fine(getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!"); 106 LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName() + 107 "Trying to set failed on an already failed task. Something fishy!!!"); 108 return; 109 } 110 taskNodeStatus.clearStateAndReleaseLocks(); 111 LOG.finest(getQualifiedName() + "Changed status to failed."); 112 LOG.finest(getQualifiedName() + "Resetting topoSetupSent to false"); 113 topoSetupSent.set(false); 114 if (parent != null && parent.isRunning()) { 115 parent.onChildDead(taskId); 116 } else { 117 LOG.finest(getQualifiedName() + "Skipping asking parent to process child death"); 118 } 119 for (final TaskNode child : children) { 120 if (child.isRunning()) { 121 child.onParentDead(); 122 } 123 } 124 final int newVersion = this.version.incrementAndGet(); 125 LOG.finest(getQualifiedName() + "Bumping up to version-" + newVersion); 126 LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName()); 127 } 128 129 @Override 130 public void onRunningTask() { 131 LOG.entering("TaskNodeImpl", "onRunningTask", getQualifiedName()); 132 if (!running.compareAndSet(false, true)) { 133 LOG.fine(getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!"); 134 LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName() + 135 "Trying to set running on an already running task. Something fishy!!!"); 136 return; 137 } 138 final int newVersion = this.version.get(); 139 LOG.finest(getQualifiedName() + "Changed status to running version-" + newVersion); 140 if (parent != null && parent.isRunning()) { 141 final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, 142 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parent.getTaskId(), 143 parent.getVersion(), taskId, 144 newVersion, Utils.EMPTY_BYTE_ARR); 145 taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); 146 senderStage.onNext(gcm); 147 parent.onChildRunning(taskId); 148 } else { 149 LOG.finest(getQualifiedName() + "Skipping src add to & for parent"); 150 } 151 for (final TaskNode child : children) { 152 if (child.isRunning()) { 153 final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, 154 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, child.getTaskId(), 155 child.getVersion(), taskId, newVersion, 156 Utils.EMPTY_BYTE_ARR); 157 taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); 158 senderStage.onNext(gcm); 159 child.onParentRunning(); 160 } 161 } 162 LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName()); 163 } 164 165 /** 166 * * Methods pertaining to my status change ends ***. 167 */ 168 169 @Override 170 public void onParentRunning() { 171 LOG.entering("TaskNodeImpl", "onParentRunning", getQualifiedName()); 172 if (parent != null && parent.isRunning()) { 173 final int parentVersion = parent.getVersion(); 174 final String parentTaskId = parent.getTaskId(); 175 final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, 176 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parentTaskId, 177 parentVersion, taskId, version.get(), 178 Utils.EMPTY_BYTE_ARR); 179 taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); 180 senderStage.onNext(gcm); 181 } else { 182 LOG.finer(getQualifiedName() + "Parent was running when I was asked to add him." 183 + " However, he is not active anymore. Returning without sending ParentAdd" + " msg. ***CHECK***"); 184 } 185 LOG.exiting("TaskNodeImpl", "onParentRunning", getQualifiedName()); 186 } 187 188 @Override 189 public void onParentDead() { 190 LOG.entering("TaskNodeImpl", "onParentDead", getQualifiedName()); 191 if (parent != null) { 192 final int parentVersion = parent.getVersion(); 193 final String parentTaskId = parent.getTaskId(); 194 taskNodeStatus.updateFailureOf(parent.getTaskId()); 195 final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, 196 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead, parentTaskId, 197 parentVersion, taskId, version.get(), 198 Utils.EMPTY_BYTE_ARR); 199 taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); 200 senderStage.onNext(gcm); 201 } else { 202 throw new RuntimeException(getQualifiedName() + "Don't expect parent to be null. Something wrong"); 203 } 204 LOG.exiting("TaskNodeImpl", "onParentDead", getQualifiedName()); 205 } 206 207 @Override 208 public void onChildRunning(final String childId) { 209 LOG.entering("TaskNodeImpl", "onChildRunning", new Object[]{getQualifiedName(), childId}); 210 final TaskNode childTask = findTask(childId); 211 if (childTask != null && childTask.isRunning()) { 212 final int childVersion = childTask.getVersion(); 213 final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, 214 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, childId, 215 childVersion, taskId, version.get(), 216 Utils.EMPTY_BYTE_ARR); 217 taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); 218 senderStage.onNext(gcm); 219 } else { 220 LOG.fine(getQualifiedName() + childId + " was running when I was asked to add him." 221 + " However, I can't find a task corresponding to him now." 222 + " Returning without sending ChildAdd msg. ***CHECK***"); 223 } 224 LOG.exiting("TaskNodeImpl", "onChildRunning", getQualifiedName() + childId); 225 } 226 227 @Override 228 public void onChildDead(final String childId) { 229 LOG.entering("TaskNodeImpl", "onChildDead", new Object[]{getQualifiedName(), childId}); 230 final TaskNode childTask = findChildTask(childId); 231 if (childTask != null) { 232 final int childVersion = childTask.getVersion(); 233 taskNodeStatus.updateFailureOf(childId); 234 final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, 235 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, childId, 236 childVersion, taskId, version.get(), 237 Utils.EMPTY_BYTE_ARR); 238 taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); 239 senderStage.onNext(gcm); 240 } else { 241 throw new RuntimeException(getQualifiedName() + "Don't expect task for " + childId + 242 " to be null. Something wrong"); 243 } 244 LOG.exiting("TaskNodeImpl", "onChildDead", getQualifiedName() + childId); 245 } 246 247 /** 248 * * Methods pertaining to my neighbors status change ends ***. 249 */ 250 251 @Override 252 public void onReceiptOfAcknowledgement(final GroupCommunicationMessage msg) { 253 LOG.entering("TaskNodeImpl", "onReceiptOfAcknowledgement", new Object[]{getQualifiedName(), msg}); 254 taskNodeStatus.processAcknowledgement(msg); 255 LOG.exiting("TaskNodeImpl", "onReceiptOfAcknowledgement", getQualifiedName() + msg); 256 } 257 258 @Override 259 public void updatingTopology() { 260 LOG.entering("TaskNodeImpl", "updatingTopology", getQualifiedName()); 261 taskNodeStatus.updatingTopology(); 262 LOG.exiting("TaskNodeImpl", "updatingTopology", getQualifiedName()); 263 } 264 265 @Override 266 public String getTaskId() { 267 return taskId; 268 } 269 270 @Override 271 public void addChild(final TaskNode child) { 272 LOG.entering("TaskNodeImpl", "addChild", new Object[]{getQualifiedName(), child.getTaskId()}); 273 children.add(child); 274 LOG.exiting("TaskNodeImpl", "addChild", getQualifiedName() + child); 275 } 276 277 @Override 278 public void removeChild(final TaskNode child) { 279 LOG.entering("TaskNodeImpl", "removeChild", new Object[]{getQualifiedName(), child.getTaskId()}); 280 children.remove(child); 281 LOG.exiting("TaskNodeImpl", "removeChild", getQualifiedName() + child); 282 } 283 284 @Override 285 public void setParent(final TaskNode parent) { 286 LOG.entering("TaskNodeImpl", "setParent", new Object[]{getQualifiedName(), parent}); 287 this.parent = parent; 288 LOG.exiting("TaskNodeImpl", "setParent", getQualifiedName() + parent); 289 } 290 291 @Override 292 public boolean isRunning() { 293 LOG.entering("TaskNodeImpl", "isRunning", getQualifiedName()); 294 final boolean b = running.get(); 295 LOG.exiting("TaskNodeImpl", "isRunning", getQualifiedName() + b); 296 return b; 297 } 298 299 @Override 300 public TaskNode getParent() { 301 LOG.entering("TaskNodeImpl", "getParent", getQualifiedName()); 302 LOG.exiting("TaskNodeImpl", "getParent", getQualifiedName() + parent); 303 return parent; 304 } 305 306 @Override 307 public Iterable<TaskNode> getChildren() { 308 LOG.entering("TaskNodeImpl", "getChildren", getQualifiedName()); 309 LOG.exiting("TaskNodeImpl", "getChildren", getQualifiedName() + children); 310 return children; 311 } 312 313 private String getQualifiedName() { 314 return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + getVersion() + ") - "; 315 } 316 317 @Override 318 public boolean isNeighborActive(final String neighborId) { 319 LOG.entering("TaskNodeImpl", "isNeighborActive", new Object[]{getQualifiedName(), neighborId}); 320 final boolean active = taskNodeStatus.isActive(neighborId); 321 LOG.exiting("TaskNodeImpl", "isNeighborActive", getQualifiedName() + active); 322 return active; 323 } 324 325 @Override 326 public boolean resetTopologySetupSent() { 327 LOG.entering("TaskNodeImpl", "resetTopologySetupSent", new Object[]{getQualifiedName(), }); 328 final boolean retVal = topoSetupSent.compareAndSet(true, false); 329 LOG.exiting("TaskNodeImpl", "resetTopologySetupSent", getQualifiedName() + retVal); 330 return retVal; 331 } 332 333 @Override 334 public void checkAndSendTopologySetupMessage() { 335 LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName()); 336 if (!topoSetupSent.get() 337 && parentActive() && activeNeighborOfParent() 338 && allChildrenActive() && activeNeighborOfAllChildren()) { 339 sendTopoSetupMsg(); 340 } 341 LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName()); 342 } 343 344 private void sendTopoSetupMsg() { 345 LOG.entering("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName() + taskId); 346 LOG.fine(getQualifiedName() + "is an active participant in the topology"); 347 senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 348 ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, driverId, 0, taskId, 349 version.get(), Utils.EMPTY_BYTE_ARR)); 350 taskNodeStatus.onTopologySetupMessageSent(); 351 final boolean sentAlready = !topoSetupSent.compareAndSet(false, true); 352 if (sentAlready) { 353 LOG.fine(getQualifiedName() + "TopologySetup msg was sent more than once. Something fishy!!!"); 354 } 355 LOG.exiting("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName()); 356 } 357 358 @Override 359 public void checkAndSendTopologySetupMessageFor(final String source) { 360 LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", new Object[]{getQualifiedName(), source}); 361 final TaskNode srcNode = findTask(source); 362 if (srcNode != null) { 363 srcNode.checkAndSendTopologySetupMessage(); 364 } 365 LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", getQualifiedName() + source); 366 } 367 368 /** 369 * @param sourceId 370 * @return 371 */ 372 private TaskNode findTask(final String sourceId) { 373 LOG.entering("TaskNodeImpl", "findTask", new Object[]{getQualifiedName(), sourceId}); 374 final TaskNode retNode; 375 if (parent != null && parent.getTaskId().equals(sourceId)) { 376 retNode = parent; 377 } else { 378 retNode = findChildTask(sourceId); 379 } 380 LOG.exiting("TaskNodeImpl", "findTask", getQualifiedName() + retNode); 381 return retNode; 382 } 383 384 private TaskNode findChildTask(final String sourceId) { 385 LOG.entering("TaskNodeImpl", "findChildTask", new Object[]{getQualifiedName(), sourceId}); 386 TaskNode retNode = null; 387 for (final TaskNode child : children) { 388 if (child.getTaskId().equals(sourceId)) { 389 retNode = child; 390 break; 391 } 392 } 393 LOG.exiting("TaskNodeImpl", "findChildTask", getQualifiedName() + retNode); 394 return retNode; 395 } 396 397 private boolean parentActive() { 398 LOG.entering("TaskNodeImpl", "parentActive", getQualifiedName()); 399 if (isRoot) { 400 LOG.exiting("TaskNodeImpl", "parentActive", 401 Arrays.toString(new Object[]{true, getQualifiedName(), 402 "I am root. Will never have parent. So signalling active"})); 403 return true; 404 } 405 if (isNeighborActive(parent.getTaskId())) { 406 LOG.exiting("TaskNodeImpl", "parentActive", 407 Arrays.toString(new Object[]{true, getQualifiedName(), parent, " is an active neighbor"})); 408 return true; 409 } 410 LOG.exiting("TaskNodeImpl", "parentActive", 411 getQualifiedName() + "Neither root Nor is " + parent + " an active neighbor"); 412 return false; 413 } 414 415 private boolean activeNeighborOfParent() { 416 LOG.entering("TaskNodeImpl", "activeNeighborOfParent", getQualifiedName()); 417 if (isRoot) { 418 LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), 419 "I am root. Will never have parent. So signalling active"})); 420 return true; 421 } 422 if (parent.isNeighborActive(taskId)) { 423 LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), 424 "I am an active neighbor of parent ", parent})); 425 return true; 426 } 427 LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{false, getQualifiedName(), 428 "Neither is parent null Nor am I an active neighbor of parent ", parent})); 429 return false; 430 } 431 432 private boolean allChildrenActive() { 433 LOG.entering("TaskNodeImpl", "allChildrenActive", getQualifiedName()); 434 for (final TaskNode child : children) { 435 final String childId = child.getTaskId(); 436 if (child.isRunning() && !isNeighborActive(childId)) { 437 LOG.exiting("TaskNodeImpl", "allChildrenActive", 438 Arrays.toString(new Object[]{false, getQualifiedName(), childId, " not active yet"})); 439 return false; 440 } 441 } 442 LOG.exiting("TaskNodeImpl", "allChildrenActive", 443 Arrays.toString(new Object[]{true, getQualifiedName(), "All children active"})); 444 return true; 445 } 446 447 private boolean activeNeighborOfAllChildren() { 448 LOG.entering("TaskNodeImpl", "activeNeighborOfAllChildren", getQualifiedName()); 449 for (final TaskNode child : children) { 450 if (child.isRunning() && !child.isNeighborActive(taskId)) { 451 LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", 452 Arrays.toString(new Object[]{false, getQualifiedName(), "Not an active neighbor of child ", child})); 453 return false; 454 } 455 } 456 LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", 457 Arrays.toString(new Object[]{true, getQualifiedName(), "Active neighbor of all children"})); 458 return true; 459 } 460 461 @Override 462 public void waitForTopologySetupOrFailure() { 463 LOG.entering("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName()); 464 taskNodeStatus.waitForTopologySetup(); 465 LOG.exiting("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName()); 466 } 467 468 @Override 469 public boolean hasChanges() { 470 LOG.entering("TaskNodeImpl", "hasChanges", getQualifiedName()); 471 final boolean changes = taskNodeStatus.hasChanges(); 472 LOG.exiting("TaskNodeImpl", "hasChanges", getQualifiedName() + changes); 473 return changes; 474 } 475 476 @Override 477 public int getVersion() { 478 return version.get(); 479 } 480 481 @Override 482 public int hashCode() { 483 int r = taskId.hashCode(); 484 r = 31 * r + version.get(); 485 return r; 486 } 487 488 @Override 489 public boolean equals(final Object obj) { 490 if (obj != this) { 491 if (obj instanceof TaskNodeImpl) { 492 final TaskNodeImpl that = (TaskNodeImpl) obj; 493 return this.taskId.equals(that.taskId) && this.version.get() == that.version.get(); 494 } else { 495 return false; 496 } 497 } else { 498 return true; 499 } 500 } 501}