This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.driver;
020
021import org.apache.reef.io.network.group.api.driver.TaskNode;
022import org.apache.reef.io.network.group.api.driver.TaskNodeStatus;
023import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
024import org.apache.reef.io.network.group.impl.utils.Utils;
025import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
026import org.apache.reef.tang.annotations.Name;
027import org.apache.reef.wake.EStage;
028
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.logging.Logger;
035
036public class TaskNodeImpl implements TaskNode {
037
038  private static final Logger LOG = Logger.getLogger(TaskNodeImpl.class.getName());
039
040  private final EStage<GroupCommunicationMessage> senderStage;
041  private final Class<? extends Name<String>> groupName;
042  private final Class<? extends Name<String>> operName;
043  private final String taskId;
044  private final String driverId;
045
046  private final boolean isRoot;
047  private TaskNode parent;
048  private TaskNode sibling;
049  private final List<TaskNode> children = new ArrayList<>();
050
051  private final AtomicBoolean running = new AtomicBoolean(false);
052  private final AtomicBoolean topoSetupSent = new AtomicBoolean(false);
053
054  private final TaskNodeStatus taskNodeStatus;
055
056  private final AtomicInteger version = new AtomicInteger(0);
057
058  public TaskNodeImpl(final EStage<GroupCommunicationMessage> senderStage,
059                      final Class<? extends Name<String>> groupName,
060                      final Class<? extends Name<String>> operatorName,
061                      final String taskId, final String driverId, final boolean isRoot) {
062    this.senderStage = senderStage;
063    this.groupName = groupName;
064    this.operName = operatorName;
065    this.taskId = taskId;
066    this.driverId = driverId;
067    this.isRoot = isRoot;
068    taskNodeStatus = new TaskNodeStatusImpl(groupName, operatorName, taskId, this);
069  }
070
071  @Override
072  public void setSibling(final TaskNode leaf) {
073    LOG.entering("TaskNodeImpl", "setSibling", new Object[]{getQualifiedName(), leaf});
074    sibling = leaf;
075    LOG.exiting("TaskNodeImpl", "setSibling", getQualifiedName());
076  }
077
078  @Override
079  public int getNumberOfChildren() {
080    LOG.entering("TaskNodeImpl", "getNumberOfChildren", getQualifiedName());
081    final int size = children.size();
082    LOG.exiting("TaskNodeImpl", "getNumberOfChildren", getQualifiedName() + size);
083    return size;
084  }
085
086  @Override
087  public TaskNode successor() {
088    LOG.entering("TaskNodeImpl", "successor", getQualifiedName());
089    LOG.exiting("TaskNodeImpl", "successor", getQualifiedName() + sibling);
090    return sibling;
091  }
092
093  @Override
094  public String toString() {
095    return "(" + taskId + "," + version.get() + ")";
096  }
097
098  /**
099   * * Methods pertaining to my status change ***.
100   */
101  @Override
102  public void onFailedTask() {
103    LOG.entering("TaskNodeImpl", "onFailedTask", getQualifiedName());
104    if (!running.compareAndSet(true, false)) {
105      LOG.fine(getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!");
106      LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName() +
107          "Trying to set failed on an already failed task. Something fishy!!!");
108      return;
109    }
110    taskNodeStatus.clearStateAndReleaseLocks();
111    LOG.finest(getQualifiedName() + "Changed status to failed.");
112    LOG.finest(getQualifiedName() + "Resetting topoSetupSent to false");
113    topoSetupSent.set(false);
114    if (parent != null && parent.isRunning()) {
115      parent.onChildDead(taskId);
116    } else {
117      LOG.finest(getQualifiedName() + "Skipping asking parent to process child death");
118    }
119    for (final TaskNode child : children) {
120      if (child.isRunning()) {
121        child.onParentDead();
122      }
123    }
124    final int newVersion = this.version.incrementAndGet();
125    LOG.finest(getQualifiedName() + "Bumping up to version-" + newVersion);
126    LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName());
127  }
128
129  @Override
130  public void onRunningTask() {
131    LOG.entering("TaskNodeImpl", "onRunningTask", getQualifiedName());
132    if (!running.compareAndSet(false, true)) {
133      LOG.fine(getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!");
134      LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName() +
135          "Trying to set running on an already running task. Something fishy!!!");
136      return;
137    }
138    final int newVersion = this.version.get();
139    LOG.finest(getQualifiedName() + "Changed status to running version-" + newVersion);
140    if (parent != null && parent.isRunning()) {
141      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName,
142          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parent.getTaskId(),
143          parent.getVersion(), taskId,
144          newVersion, Utils.EMPTY_BYTE_ARR);
145      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
146      senderStage.onNext(gcm);
147      parent.onChildRunning(taskId);
148    } else {
149      LOG.finest(getQualifiedName() + "Skipping src add to & for parent");
150    }
151    for (final TaskNode child : children) {
152      if (child.isRunning()) {
153        final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName,
154            ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, child.getTaskId(),
155            child.getVersion(), taskId, newVersion,
156            Utils.EMPTY_BYTE_ARR);
157        taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
158        senderStage.onNext(gcm);
159        child.onParentRunning();
160      }
161    }
162    LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName());
163  }
164
165  /**
166   * * Methods pertaining to my status change ends ***.
167   */
168
169  @Override
170  public void onParentRunning() {
171    LOG.entering("TaskNodeImpl", "onParentRunning", getQualifiedName());
172    if (parent != null && parent.isRunning()) {
173      final int parentVersion = parent.getVersion();
174      final String parentTaskId = parent.getTaskId();
175      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName,
176          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parentTaskId,
177          parentVersion, taskId, version.get(),
178          Utils.EMPTY_BYTE_ARR);
179      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
180      senderStage.onNext(gcm);
181    } else {
182      LOG.finer(getQualifiedName() + "Parent was running when I was asked to add him."
183          + " However, he is not active anymore. Returning without sending ParentAdd" + " msg. ***CHECK***");
184    }
185    LOG.exiting("TaskNodeImpl", "onParentRunning", getQualifiedName());
186  }
187
188  @Override
189  public void onParentDead() {
190    LOG.entering("TaskNodeImpl", "onParentDead", getQualifiedName());
191    if (parent != null) {
192      final int parentVersion = parent.getVersion();
193      final String parentTaskId = parent.getTaskId();
194      taskNodeStatus.updateFailureOf(parent.getTaskId());
195      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName,
196          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead, parentTaskId,
197          parentVersion, taskId, version.get(),
198          Utils.EMPTY_BYTE_ARR);
199      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
200      senderStage.onNext(gcm);
201    } else {
202      throw new RuntimeException(getQualifiedName() + "Don't expect parent to be null. Something wrong");
203    }
204    LOG.exiting("TaskNodeImpl", "onParentDead", getQualifiedName());
205  }
206
207  @Override
208  public void onChildRunning(final String childId) {
209    LOG.entering("TaskNodeImpl", "onChildRunning", new Object[]{getQualifiedName(), childId});
210    final TaskNode childTask = findTask(childId);
211    if (childTask != null && childTask.isRunning()) {
212      final int childVersion = childTask.getVersion();
213      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName,
214          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, childId,
215          childVersion, taskId, version.get(),
216          Utils.EMPTY_BYTE_ARR);
217      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
218      senderStage.onNext(gcm);
219    } else {
220      LOG.fine(getQualifiedName() + childId + " was running when I was asked to add him."
221          + " However, I can't find a task corresponding to him now."
222          + " Returning without sending ChildAdd msg. ***CHECK***");
223    }
224    LOG.exiting("TaskNodeImpl", "onChildRunning", getQualifiedName() + childId);
225  }
226
227  @Override
228  public void onChildDead(final String childId) {
229    LOG.entering("TaskNodeImpl", "onChildDead", new Object[]{getQualifiedName(), childId});
230    final TaskNode childTask = findChildTask(childId);
231    if (childTask != null) {
232      final int childVersion = childTask.getVersion();
233      taskNodeStatus.updateFailureOf(childId);
234      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName,
235          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, childId,
236          childVersion, taskId, version.get(),
237          Utils.EMPTY_BYTE_ARR);
238      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
239      senderStage.onNext(gcm);
240    } else {
241      throw new RuntimeException(getQualifiedName() + "Don't expect task for " + childId +
242          " to be null. Something wrong");
243    }
244    LOG.exiting("TaskNodeImpl", "onChildDead", getQualifiedName() + childId);
245  }
246
247  /**
248   * * Methods pertaining to my neighbors status change ends ***.
249   */
250
251  @Override
252  public void onReceiptOfAcknowledgement(final GroupCommunicationMessage msg) {
253    LOG.entering("TaskNodeImpl", "onReceiptOfAcknowledgement", new Object[]{getQualifiedName(), msg});
254    taskNodeStatus.processAcknowledgement(msg);
255    LOG.exiting("TaskNodeImpl", "onReceiptOfAcknowledgement", getQualifiedName() + msg);
256  }
257
258  @Override
259  public void updatingTopology() {
260    LOG.entering("TaskNodeImpl", "updatingTopology", getQualifiedName());
261    taskNodeStatus.updatingTopology();
262    LOG.exiting("TaskNodeImpl", "updatingTopology", getQualifiedName());
263  }
264
265  @Override
266  public String getTaskId() {
267    return taskId;
268  }
269
270  @Override
271  public void addChild(final TaskNode child) {
272    LOG.entering("TaskNodeImpl", "addChild", new Object[]{getQualifiedName(), child.getTaskId()});
273    children.add(child);
274    LOG.exiting("TaskNodeImpl", "addChild", getQualifiedName() + child);
275  }
276
277  @Override
278  public void removeChild(final TaskNode child) {
279    LOG.entering("TaskNodeImpl", "removeChild", new Object[]{getQualifiedName(), child.getTaskId()});
280    children.remove(child);
281    LOG.exiting("TaskNodeImpl", "removeChild", getQualifiedName() + child);
282  }
283
284  @Override
285  public void setParent(final TaskNode parent) {
286    LOG.entering("TaskNodeImpl", "setParent", new Object[]{getQualifiedName(), parent});
287    this.parent = parent;
288    LOG.exiting("TaskNodeImpl", "setParent", getQualifiedName() + parent);
289  }
290
291  @Override
292  public boolean isRunning() {
293    LOG.entering("TaskNodeImpl", "isRunning", getQualifiedName());
294    final boolean b = running.get();
295    LOG.exiting("TaskNodeImpl", "isRunning", getQualifiedName() + b);
296    return b;
297  }
298
299  @Override
300  public TaskNode getParent() {
301    LOG.entering("TaskNodeImpl", "getParent", getQualifiedName());
302    LOG.exiting("TaskNodeImpl", "getParent", getQualifiedName() + parent);
303    return parent;
304  }
305
306  @Override
307  public Iterable<TaskNode> getChildren() {
308    LOG.entering("TaskNodeImpl", "getChildren", getQualifiedName());
309    LOG.exiting("TaskNodeImpl", "getChildren", getQualifiedName() + children);
310    return children;
311  }
312
313  private String getQualifiedName() {
314    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + getVersion() + ") - ";
315  }
316
317  @Override
318  public boolean isNeighborActive(final String neighborId) {
319    LOG.entering("TaskNodeImpl", "isNeighborActive", new Object[]{getQualifiedName(), neighborId});
320    final boolean active = taskNodeStatus.isActive(neighborId);
321    LOG.exiting("TaskNodeImpl", "isNeighborActive", getQualifiedName() + active);
322    return active;
323  }
324
325  @Override
326  public boolean resetTopologySetupSent() {
327    LOG.entering("TaskNodeImpl", "resetTopologySetupSent", new Object[]{getQualifiedName(), });
328    final boolean retVal = topoSetupSent.compareAndSet(true, false);
329    LOG.exiting("TaskNodeImpl", "resetTopologySetupSent", getQualifiedName() + retVal);
330    return retVal;
331  }
332
333  @Override
334  public void checkAndSendTopologySetupMessage() {
335    LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName());
336    if (!topoSetupSent.get()
337        && parentActive() && activeNeighborOfParent()
338        && allChildrenActive() && activeNeighborOfAllChildren()) {
339      sendTopoSetupMsg();
340    }
341    LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName());
342  }
343
344  private void sendTopoSetupMsg() {
345    LOG.entering("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName() + taskId);
346    LOG.fine(getQualifiedName() + "is an active participant in the topology");
347    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
348        ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, driverId, 0, taskId,
349        version.get(), Utils.EMPTY_BYTE_ARR));
350    taskNodeStatus.onTopologySetupMessageSent();
351    final boolean sentAlready = !topoSetupSent.compareAndSet(false, true);
352    if (sentAlready) {
353      LOG.fine(getQualifiedName() + "TopologySetup msg was sent more than once. Something fishy!!!");
354    }
355    LOG.exiting("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName());
356  }
357
358  @Override
359  public void checkAndSendTopologySetupMessageFor(final String source) {
360    LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", new Object[]{getQualifiedName(), source});
361    final TaskNode srcNode = findTask(source);
362    if (srcNode != null) {
363      srcNode.checkAndSendTopologySetupMessage();
364    }
365    LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", getQualifiedName() + source);
366  }
367
368  /**
369   * @param sourceId
370   * @return
371   */
372  private TaskNode findTask(final String sourceId) {
373    LOG.entering("TaskNodeImpl", "findTask", new Object[]{getQualifiedName(), sourceId});
374    final TaskNode retNode;
375    if (parent != null && parent.getTaskId().equals(sourceId)) {
376      retNode = parent;
377    } else {
378      retNode = findChildTask(sourceId);
379    }
380    LOG.exiting("TaskNodeImpl", "findTask", getQualifiedName() + retNode);
381    return retNode;
382  }
383
384  private TaskNode findChildTask(final String sourceId) {
385    LOG.entering("TaskNodeImpl", "findChildTask", new Object[]{getQualifiedName(), sourceId});
386    TaskNode retNode = null;
387    for (final TaskNode child : children) {
388      if (child.getTaskId().equals(sourceId)) {
389        retNode = child;
390        break;
391      }
392    }
393    LOG.exiting("TaskNodeImpl", "findChildTask", getQualifiedName() + retNode);
394    return retNode;
395  }
396
397  private boolean parentActive() {
398    LOG.entering("TaskNodeImpl", "parentActive", getQualifiedName());
399    if (isRoot) {
400      LOG.exiting("TaskNodeImpl", "parentActive",
401          Arrays.toString(new Object[]{true, getQualifiedName(),
402              "I am root. Will never have parent. So signalling active"}));
403      return true;
404    }
405    if (isNeighborActive(parent.getTaskId())) {
406      LOG.exiting("TaskNodeImpl", "parentActive",
407          Arrays.toString(new Object[]{true, getQualifiedName(), parent, " is an active neighbor"}));
408      return true;
409    }
410    LOG.exiting("TaskNodeImpl", "parentActive",
411        getQualifiedName() + "Neither root Nor is " + parent + " an active neighbor");
412    return false;
413  }
414
415  private boolean activeNeighborOfParent() {
416    LOG.entering("TaskNodeImpl", "activeNeighborOfParent", getQualifiedName());
417    if (isRoot) {
418      LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(),
419          "I am root. Will never have parent. So signalling active"}));
420      return true;
421    }
422    if (parent.isNeighborActive(taskId)) {
423      LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(),
424          "I am an active neighbor of parent ", parent}));
425      return true;
426    }
427    LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{false, getQualifiedName(),
428        "Neither is parent null Nor am I an active neighbor of parent ", parent}));
429    return false;
430  }
431
432  private boolean allChildrenActive() {
433    LOG.entering("TaskNodeImpl", "allChildrenActive", getQualifiedName());
434    for (final TaskNode child : children) {
435      final String childId = child.getTaskId();
436      if (child.isRunning() && !isNeighborActive(childId)) {
437        LOG.exiting("TaskNodeImpl", "allChildrenActive",
438            Arrays.toString(new Object[]{false, getQualifiedName(), childId, " not active yet"}));
439        return false;
440      }
441    }
442    LOG.exiting("TaskNodeImpl", "allChildrenActive",
443        Arrays.toString(new Object[]{true, getQualifiedName(), "All children active"}));
444    return true;
445  }
446
447  private boolean activeNeighborOfAllChildren() {
448    LOG.entering("TaskNodeImpl", "activeNeighborOfAllChildren", getQualifiedName());
449    for (final TaskNode child : children) {
450      if (child.isRunning() && !child.isNeighborActive(taskId)) {
451        LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren",
452            Arrays.toString(new Object[]{false, getQualifiedName(), "Not an active neighbor of child ", child}));
453        return false;
454      }
455    }
456    LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren",
457        Arrays.toString(new Object[]{true, getQualifiedName(), "Active neighbor of all children"}));
458    return true;
459  }
460
461  @Override
462  public void waitForTopologySetupOrFailure() {
463    LOG.entering("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName());
464    taskNodeStatus.waitForTopologySetup();
465    LOG.exiting("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName());
466  }
467
468  @Override
469  public boolean hasChanges() {
470    LOG.entering("TaskNodeImpl", "hasChanges", getQualifiedName());
471    final boolean changes = taskNodeStatus.hasChanges();
472    LOG.exiting("TaskNodeImpl", "hasChanges", getQualifiedName() + changes);
473    return changes;
474  }
475
476  @Override
477  public int getVersion() {
478    return version.get();
479  }
480
481  @Override
482  public int hashCode() {
483    int r = taskId.hashCode();
484    r = 31 * r + version.get();
485    return r;
486  }
487
488  @Override
489  public boolean equals(final Object obj) {
490    if (obj != this) {
491      if (obj instanceof TaskNodeImpl) {
492        final TaskNodeImpl that = (TaskNodeImpl) obj;
493        return this.taskId.equals(that.taskId) && this.version.get() == that.version.get();
494      } else {
495        return false;
496      }
497    } else {
498      return true;
499    }
500  }
501}