001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.io.network.group.api.driver.TaskNode; 022import org.apache.reef.io.network.group.api.driver.TaskNodeStatus; 023import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 024import org.apache.reef.io.network.group.impl.utils.ConcurrentCountingMap; 025import org.apache.reef.io.network.group.impl.utils.CountingMap; 026import org.apache.reef.io.network.group.impl.utils.Utils; 027import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type; 028import org.apache.reef.tang.annotations.Name; 029 030import java.util.HashSet; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.logging.Logger; 034 035public class TaskNodeStatusImpl implements TaskNodeStatus { 036 037 private static final Logger LOG = Logger.getLogger(TaskNodeStatusImpl.class.getName()); 038 039 private final ConcurrentCountingMap<Type, String> statusMap = new ConcurrentCountingMap<>(); 040 private final Class<? extends Name<String>> groupName; 041 private final Class<? extends Name<String>> operName; 042 private final String taskId; 043 private final Set<String> activeNeighbors = new HashSet<>(); 044 private final CountingMap<String> neighborStatus = new CountingMap<>(); 045 private final AtomicBoolean updatingTopo = new AtomicBoolean(false); 046 private final Object topoUpdateStageLock = new Object(); 047 private final Object topoSetupSentLock = new Object(); 048 private final TaskNode node; 049 050 public TaskNodeStatusImpl(final Class<? extends Name<String>> groupName, 051 final Class<? extends Name<String>> operName, final String taskId, final TaskNode node) { 052 this.groupName = groupName; 053 this.operName = operName; 054 this.taskId = taskId; 055 this.node = node; 056 } 057 058 private boolean isDeadMsg(final Type msgAcked) { 059 return msgAcked == Type.ParentDead || msgAcked == Type.ChildDead; 060 } 061 062 private boolean isAddMsg(final Type msgAcked) { 063 return msgAcked == Type.ParentAdd || msgAcked == Type.ChildAdd; 064 } 065 066 private Type getAckedMsg(final Type msgType) { 067 switch (msgType) { 068 case ParentAdded: 069 return Type.ParentAdd; 070 case ChildAdded: 071 return Type.ChildAdd; 072 case ParentRemoved: 073 return Type.ParentDead; 074 case ChildRemoved: 075 return Type.ChildDead; 076 default: 077 return msgType; 078 } 079 } 080 081 private void chkIamActiveToSendTopoSetup(final Type msgDealt) { 082 LOG.entering("TaskNodeStatusImpl", "chkAndSendTopoSetup", new Object[]{getQualifiedName(), msgDealt}); 083 if (statusMap.isEmpty()) { 084 LOG.finest(getQualifiedName() + "Empty status map."); 085 node.checkAndSendTopologySetupMessage(); 086 } else { 087 LOG.finest(getQualifiedName() + "Status map non-empty" + statusMap); 088 } 089 LOG.exiting("TaskNodeStatusImpl", "chkAndSendTopoSetup", getQualifiedName() + msgDealt); 090 } 091 092 @Override 093 public void onTopologySetupMessageSent() { 094 LOG.entering("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName()); 095 neighborStatus.clear(); 096 synchronized (topoSetupSentLock) { 097 topoSetupSentLock.notifyAll(); 098 } 099 LOG.exiting("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName()); 100 } 101 102 @Override 103 public boolean isActive(final String neighborId) { 104 LOG.entering("TaskNodeStatusImpl", "isActive", new Object[]{getQualifiedName(), neighborId}); 105 final boolean contains = activeNeighbors.contains(neighborId); 106 LOG.exiting("TaskNodeStatusImpl", "isActive", getQualifiedName() + contains); 107 return contains; 108 } 109 110 /** 111 * This needs to happen in line rather than in a stage because we need to note. 112 * the messages we send to the tasks before we start processing msgs from the 113 * nodes.(Acks and Topology msgs) 114 */ 115 @Override 116 public void expectAckFor(final Type msgType, final String srcId) { 117 LOG.entering("TaskNodeStatusImpl", "expectAckFor", new Object[]{getQualifiedName(), msgType, srcId}); 118 LOG.finest(getQualifiedName() + "Adding " + srcId + " to sources"); 119 statusMap.add(msgType, srcId); 120 LOG.exiting("TaskNodeStatusImpl", "expectAckFor", 121 getQualifiedName() + "Sources from which ACKs for " + msgType + " are expected: " + statusMap.get(msgType)); 122 } 123 124 @Override 125 public void clearStateAndReleaseLocks() { 126 LOG.entering("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName()); 127 statusMap.clear(); 128 activeNeighbors.clear(); 129 neighborStatus.clear(); 130 updatingTopo.compareAndSet(true, false); 131 synchronized (topoSetupSentLock) { 132 topoSetupSentLock.notifyAll(); 133 } 134 synchronized (topoUpdateStageLock) { 135 topoUpdateStageLock.notifyAll(); 136 } 137 LOG.exiting("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName()); 138 } 139 140 @Override 141 public void updateFailureOf(final String failTaskId) { 142 LOG.entering("TaskNodeStatusImpl", "updateFailureOf", new Object[]{getQualifiedName(), failTaskId}); 143 activeNeighbors.remove(failTaskId); 144 neighborStatus.remove(failTaskId); 145 LOG.exiting("TaskNodeStatusImpl", "updateFailureOf", getQualifiedName()); 146 } 147 148 @Override 149 public void processAcknowledgement(final GroupCommunicationMessage gcm) { 150 LOG.entering("TaskNodeStatusImpl", "processMsg", new Object[]{getQualifiedName(), gcm}); 151 final Type msgType = gcm.getType(); 152 final Type msgAcked = getAckedMsg(msgType); 153 final String sourceId = gcm.getDestid(); 154 switch (msgType) { 155 case TopologySetup: 156 synchronized (topoUpdateStageLock) { 157 if (!updatingTopo.compareAndSet(true, false)) { 158 LOG.fine(getQualifiedName() + "Was expecting updateTopo to be true but it was false"); 159 } 160 topoUpdateStageLock.notifyAll(); 161 } 162 break; 163 case ParentAdded: 164 case ChildAdded: 165 case ParentRemoved: 166 case ChildRemoved: 167 processNeighborAcks(gcm, msgType, msgAcked, sourceId); 168 break; 169 170 default: 171 LOG.fine(getQualifiedName() + "Non ACK msg " + gcm.getType() + " for " + gcm.getDestid() + " unexpected"); 172 break; 173 } 174 LOG.exiting("TaskNodeStatusImpl", "processMsg", getQualifiedName()); 175 } 176 177 private void processNeighborAcks(final GroupCommunicationMessage gcm, final Type msgType, final Type msgAcked, 178 final String sourceId) { 179 LOG.entering("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm); 180 if (statusMap.containsKey(msgAcked)) { 181 if (statusMap.contains(msgAcked, sourceId)) { 182 statusMap.remove(msgAcked, sourceId); 183 updateNeighborStatus(msgAcked, sourceId); 184 checkNeighborActiveToSendTopoSetup(sourceId); 185 chkIamActiveToSendTopoSetup(msgAcked); 186 } else { 187 LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage Got " + msgType + " from a source(" + sourceId 188 + ") to whom ChildAdd was not sent. " 189 + "Perhaps reset during failure. If context not indicative use ***CAUTION***"); 190 } 191 } else { 192 LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage There were no " + msgAcked 193 + " msgs sent in the previous update cycle. " 194 + "Perhaps reset during failure. If context not indicative use ***CAUTION***"); 195 } 196 LOG.exiting("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm); 197 } 198 199 private void checkNeighborActiveToSendTopoSetup(final String sourceId) { 200 LOG.entering("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", new Object[]{getQualifiedName(), 201 sourceId}); 202 if (statusMap.notContains(sourceId)) { 203 //All msgs corresponding to sourceId have been ACKed 204 if (neighborStatus.get(sourceId) > 0) { 205 activeNeighbors.add(sourceId); 206 node.checkAndSendTopologySetupMessageFor(sourceId); 207 } else { 208 LOG.finest(getQualifiedName() + sourceId + " is not a neighbor anymore"); 209 } 210 } else { 211 LOG.finest(getQualifiedName() + "Not done processing " + sourceId + " acks yet. So it is still inactive"); 212 } 213 LOG.exiting("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", getQualifiedName() + sourceId); 214 } 215 216 private void updateNeighborStatus(final Type msgAcked, final String sourceId) { 217 LOG.entering("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId}); 218 if (isAddMsg(msgAcked)) { 219 neighborStatus.add(sourceId); 220 } else if (isDeadMsg(msgAcked)) { 221 neighborStatus.remove(sourceId); 222 } else { 223 throw new RuntimeException("Can only deal with Neigbor ACKs while I received " + msgAcked + " from " + sourceId); 224 } 225 LOG.exiting("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId}); 226 } 227 228 @Override 229 public void updatingTopology() { 230 LOG.entering("TaskNodeStatusImpl", "updatingTopology", getQualifiedName()); 231 final boolean topoBeingUpdated = !updatingTopo.compareAndSet(false, true); 232 if (topoBeingUpdated) { 233 throw new RuntimeException(getQualifiedName() + "Was expecting updateTopo to be false but it was true"); 234 } 235 LOG.exiting("TaskNodeStatusImpl", "updatingTopology", getQualifiedName()); 236 } 237 238 private String getQualifiedName() { 239 return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + 240 node.getVersion() + ") - "; 241 } 242 243 @Override 244 public boolean hasChanges() { 245 LOG.entering("TaskNodeStatusImpl", "hasChanges", getQualifiedName()); 246 final boolean notEmpty = !statusMap.isEmpty(); 247 LOG.exiting("TaskNodeStatusImpl", "hasChanges", getQualifiedName() + notEmpty); 248 return notEmpty; 249 } 250 251 @Override 252 public void waitForTopologySetup() { 253 LOG.entering("TaskNodeStatusImpl", "waitForTopologySetup", getQualifiedName()); 254 LOG.finest("Waiting to acquire topoUpdateStageLock"); 255 synchronized (topoUpdateStageLock) { 256 LOG.finest(getQualifiedName() + "Acquired topoUpdateStageLock. updatingTopo: " + updatingTopo.get()); 257 while (updatingTopo.get() && node.isRunning()) { 258 try { 259 LOG.finest(getQualifiedName() + "Waiting on topoUpdateStageLock"); 260 topoUpdateStageLock.wait(); 261 } catch (final InterruptedException e) { 262 throw new RuntimeException("InterruptedException in NodeTopologyUpdateWaitStage " 263 + "while waiting for receiving TopologySetup", e); 264 } 265 } 266 } 267 } 268}