This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.driver;
020
021import org.apache.reef.io.network.group.api.driver.TaskNode;
022import org.apache.reef.io.network.group.api.driver.TaskNodeStatus;
023import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
024import org.apache.reef.io.network.group.impl.utils.ConcurrentCountingMap;
025import org.apache.reef.io.network.group.impl.utils.CountingMap;
026import org.apache.reef.io.network.group.impl.utils.Utils;
027import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type;
028import org.apache.reef.tang.annotations.Name;
029
030import java.util.HashSet;
031import java.util.Set;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.logging.Logger;
034
035public class TaskNodeStatusImpl implements TaskNodeStatus {
036
037  private static final Logger LOG = Logger.getLogger(TaskNodeStatusImpl.class.getName());
038
039  private final ConcurrentCountingMap<Type, String> statusMap = new ConcurrentCountingMap<>();
040  private final Class<? extends Name<String>> groupName;
041  private final Class<? extends Name<String>> operName;
042  private final String taskId;
043  private final Set<String> activeNeighbors = new HashSet<>();
044  private final CountingMap<String> neighborStatus = new CountingMap<>();
045  private final AtomicBoolean updatingTopo = new AtomicBoolean(false);
046  private final Object topoUpdateStageLock = new Object();
047  private final Object topoSetupSentLock = new Object();
048  private final TaskNode node;
049
050  public TaskNodeStatusImpl(final Class<? extends Name<String>> groupName,
051                            final Class<? extends Name<String>> operName, final String taskId, final TaskNode node) {
052    this.groupName = groupName;
053    this.operName = operName;
054    this.taskId = taskId;
055    this.node = node;
056  }
057
058  private boolean isDeadMsg(final Type msgAcked) {
059    return msgAcked == Type.ParentDead || msgAcked == Type.ChildDead;
060  }
061
062  private boolean isAddMsg(final Type msgAcked) {
063    return msgAcked == Type.ParentAdd || msgAcked == Type.ChildAdd;
064  }
065
066  private Type getAckedMsg(final Type msgType) {
067    switch (msgType) {
068    case ParentAdded:
069      return Type.ParentAdd;
070    case ChildAdded:
071      return Type.ChildAdd;
072    case ParentRemoved:
073      return Type.ParentDead;
074    case ChildRemoved:
075      return Type.ChildDead;
076    default:
077      return msgType;
078    }
079  }
080
081  private void chkIamActiveToSendTopoSetup(final Type msgDealt) {
082    LOG.entering("TaskNodeStatusImpl", "chkAndSendTopoSetup", new Object[]{getQualifiedName(), msgDealt});
083    if (statusMap.isEmpty()) {
084      LOG.finest(getQualifiedName() + "Empty status map.");
085      node.checkAndSendTopologySetupMessage();
086    } else {
087      LOG.finest(getQualifiedName() + "Status map non-empty" + statusMap);
088    }
089    LOG.exiting("TaskNodeStatusImpl", "chkAndSendTopoSetup", getQualifiedName() + msgDealt);
090  }
091
092  @Override
093  public void onTopologySetupMessageSent() {
094    LOG.entering("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName());
095    neighborStatus.clear();
096    synchronized (topoSetupSentLock) {
097      topoSetupSentLock.notifyAll();
098    }
099    LOG.exiting("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName());
100  }
101
102  @Override
103  public boolean isActive(final String neighborId) {
104    LOG.entering("TaskNodeStatusImpl", "isActive", new Object[]{getQualifiedName(), neighborId});
105    final boolean contains = activeNeighbors.contains(neighborId);
106    LOG.exiting("TaskNodeStatusImpl", "isActive", getQualifiedName() + contains);
107    return contains;
108  }
109
110  /**
111   * This needs to happen in line rather than in a stage because we need to note.
112   * the messages we send to the tasks before we start processing msgs from the
113   * nodes.(Acks and Topology msgs)
114   */
115  @Override
116  public void expectAckFor(final Type msgType, final String srcId) {
117    LOG.entering("TaskNodeStatusImpl", "expectAckFor", new Object[]{getQualifiedName(), msgType, srcId});
118    LOG.finest(getQualifiedName() + "Adding " + srcId + " to sources");
119    statusMap.add(msgType, srcId);
120    LOG.exiting("TaskNodeStatusImpl", "expectAckFor",
121        getQualifiedName() + "Sources from which ACKs for " + msgType + " are expected: " + statusMap.get(msgType));
122  }
123
124  @Override
125  public void clearStateAndReleaseLocks() {
126    LOG.entering("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName());
127    statusMap.clear();
128    activeNeighbors.clear();
129    neighborStatus.clear();
130    updatingTopo.compareAndSet(true, false);
131    synchronized (topoSetupSentLock) {
132      topoSetupSentLock.notifyAll();
133    }
134    synchronized (topoUpdateStageLock) {
135      topoUpdateStageLock.notifyAll();
136    }
137    LOG.exiting("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName());
138  }
139
140  @Override
141  public void updateFailureOf(final String failTaskId) {
142    LOG.entering("TaskNodeStatusImpl", "updateFailureOf", new Object[]{getQualifiedName(), failTaskId});
143    activeNeighbors.remove(failTaskId);
144    neighborStatus.remove(failTaskId);
145    LOG.exiting("TaskNodeStatusImpl", "updateFailureOf", getQualifiedName());
146  }
147
148  @Override
149  public void processAcknowledgement(final GroupCommunicationMessage gcm) {
150    LOG.entering("TaskNodeStatusImpl", "processMsg", new Object[]{getQualifiedName(), gcm});
151    final Type msgType = gcm.getType();
152    final Type msgAcked = getAckedMsg(msgType);
153    final String sourceId = gcm.getDestid();
154    switch (msgType) {
155    case TopologySetup:
156      synchronized (topoUpdateStageLock) {
157        if (!updatingTopo.compareAndSet(true, false)) {
158          LOG.fine(getQualifiedName() + "Was expecting updateTopo to be true but it was false");
159        }
160        topoUpdateStageLock.notifyAll();
161      }
162      break;
163    case ParentAdded:
164    case ChildAdded:
165    case ParentRemoved:
166    case ChildRemoved:
167      processNeighborAcks(gcm, msgType, msgAcked, sourceId);
168      break;
169
170    default:
171      LOG.fine(getQualifiedName() + "Non ACK msg " + gcm.getType() + " for " + gcm.getDestid() + " unexpected");
172      break;
173    }
174    LOG.exiting("TaskNodeStatusImpl", "processMsg", getQualifiedName());
175  }
176
177  private void processNeighborAcks(final GroupCommunicationMessage gcm, final Type msgType, final Type msgAcked,
178                                   final String sourceId) {
179    LOG.entering("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm);
180    if (statusMap.containsKey(msgAcked)) {
181      if (statusMap.contains(msgAcked, sourceId)) {
182        statusMap.remove(msgAcked, sourceId);
183        updateNeighborStatus(msgAcked, sourceId);
184        checkNeighborActiveToSendTopoSetup(sourceId);
185        chkIamActiveToSendTopoSetup(msgAcked);
186      } else {
187        LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage Got " + msgType + " from a source(" + sourceId
188            + ") to whom ChildAdd was not sent. "
189            + "Perhaps reset during failure. If context not indicative use ***CAUTION***");
190      }
191    } else {
192      LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage There were no " + msgAcked
193          + " msgs sent in the previous update cycle. "
194          + "Perhaps reset during failure. If context not indicative use ***CAUTION***");
195    }
196    LOG.exiting("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm);
197  }
198
199  private void checkNeighborActiveToSendTopoSetup(final String sourceId) {
200    LOG.entering("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", new Object[]{getQualifiedName(),
201        sourceId});
202    if (statusMap.notContains(sourceId)) {
203      //All msgs corresponding to sourceId have been ACKed
204      if (neighborStatus.get(sourceId) > 0) {
205        activeNeighbors.add(sourceId);
206        node.checkAndSendTopologySetupMessageFor(sourceId);
207      } else {
208        LOG.finest(getQualifiedName() + sourceId + " is not a neighbor anymore");
209      }
210    } else {
211      LOG.finest(getQualifiedName() + "Not done processing " + sourceId + " acks yet. So it is still inactive");
212    }
213    LOG.exiting("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", getQualifiedName() + sourceId);
214  }
215
216  private void updateNeighborStatus(final Type msgAcked, final String sourceId) {
217    LOG.entering("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId});
218    if (isAddMsg(msgAcked)) {
219      neighborStatus.add(sourceId);
220    } else if (isDeadMsg(msgAcked)) {
221      neighborStatus.remove(sourceId);
222    } else {
223      throw new RuntimeException("Can only deal with Neigbor ACKs while I received " + msgAcked + " from " + sourceId);
224    }
225    LOG.exiting("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId});
226  }
227
228  @Override
229  public void updatingTopology() {
230    LOG.entering("TaskNodeStatusImpl", "updatingTopology", getQualifiedName());
231    final boolean topoBeingUpdated = !updatingTopo.compareAndSet(false, true);
232    if (topoBeingUpdated) {
233      throw new RuntimeException(getQualifiedName() + "Was expecting updateTopo to be false but it was true");
234    }
235    LOG.exiting("TaskNodeStatusImpl", "updatingTopology", getQualifiedName());
236  }
237
238  private String getQualifiedName() {
239    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," +
240        node.getVersion() + ") - ";
241  }
242
243  @Override
244  public boolean hasChanges() {
245    LOG.entering("TaskNodeStatusImpl", "hasChanges", getQualifiedName());
246    final boolean notEmpty = !statusMap.isEmpty();
247    LOG.exiting("TaskNodeStatusImpl", "hasChanges", getQualifiedName() + notEmpty);
248    return notEmpty;
249  }
250
251  @Override
252  public void waitForTopologySetup() {
253    LOG.entering("TaskNodeStatusImpl", "waitForTopologySetup", getQualifiedName());
254    LOG.finest("Waiting to acquire topoUpdateStageLock");
255    synchronized (topoUpdateStageLock) {
256      LOG.finest(getQualifiedName() + "Acquired topoUpdateStageLock. updatingTopo: " + updatingTopo.get());
257      while (updatingTopo.get() && node.isRunning()) {
258        try {
259          LOG.finest(getQualifiedName() + "Waiting on topoUpdateStageLock");
260          topoUpdateStageLock.wait();
261        } catch (final InterruptedException e) {
262          throw new RuntimeException("InterruptedException in NodeTopologyUpdateWaitStage "
263              + "while waiting for receiving TopologySetup", e);
264        }
265      }
266    }
267  }
268}