This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.driver;
020
021import org.apache.reef.io.network.group.api.driver.TaskNode;
022import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
023import org.apache.reef.io.network.group.impl.utils.Utils;
024import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
025import org.apache.reef.tang.annotations.Name;
026import org.apache.reef.wake.EStage;
027import org.apache.reef.wake.EventHandler;
028
029import java.util.List;
030import java.util.logging.Logger;
031
032/**
033 *
034 */
035public class TopologyUpdateWaitHandler implements EventHandler<List<TaskNode>> {
036
037  private static final Logger LOG = Logger.getLogger(TopologyUpdateWaitHandler.class.getName());
038  private final EStage<GroupCommunicationMessage> senderStage;
039  private final Class<? extends Name<String>> groupName;
040  private final Class<? extends Name<String>> operName;
041  private final String driverId;
042  private final int driverVersion;
043  private final String dstId;
044  private final int dstVersion;
045  private final String qualifiedName;
046  private final byte[] data;
047
048
049  /**
050   * The handler will wait for all nodes to acquire topoLock
051   * and send TopologySetup msg. Then it will send TopologyUpdated
052   * msg. However, any local topology changes are not in effect
053   * till driver sends TopologySetup once statusMap is emptied
054   * The operations in the tasks that have topology changes will
055   * wait for this. However other tasks that do not have any changes
056   * will continue their regular operation
057   */
058  public TopologyUpdateWaitHandler(final EStage<GroupCommunicationMessage> senderStage,
059                                   final Class<? extends Name<String>> groupName,
060                                   final Class<? extends Name<String>> operName,
061                                   final String driverId, final int driverVersion,
062                                   final String dstId, final int dstVersion,
063                                   final String qualifiedName, final byte[] data) {
064    super();
065    this.senderStage = senderStage;
066    this.groupName = groupName;
067    this.operName = operName;
068    this.driverId = driverId;
069    this.driverVersion = driverVersion;
070    this.dstId = dstId;
071    this.dstVersion = dstVersion;
072    this.qualifiedName = qualifiedName;
073    this.data = data;
074  }
075
076
077  @Override
078  public void onNext(final List<TaskNode> nodes) {
079    LOG.entering("TopologyUpdateWaitHandler", "onNext", new Object[]{qualifiedName, nodes});
080
081    for (final TaskNode node : nodes) {
082      LOG.fine(qualifiedName + "Waiting for " + node + " to enter TopologyUdate phase");
083      node.waitForTopologySetupOrFailure();
084      if (node.isRunning()) {
085        LOG.fine(qualifiedName + node + " is in TopologyUpdate phase");
086      } else {
087        LOG.fine(qualifiedName + node + " has failed");
088      }
089    }
090    LOG.finest(qualifiedName + "NodeTopologyUpdateWaitStage All to be updated nodes " + "have received TopologySetup");
091    LOG.fine(qualifiedName + "All affected parts of the topology are in TopologyUpdate phase. Will send a note to ("
092        + dstId + "," + dstVersion + ")");
093    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
094        ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, driverId, driverVersion, dstId,
095        dstVersion, data));
096    LOG.exiting("TopologyUpdateWaitHandler", "onNext", qualifiedName);
097  }
098
099}