001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.io.network.group.api.driver.TaskNode; 022import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 023import org.apache.reef.io.network.group.impl.utils.Utils; 024import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 025import org.apache.reef.tang.annotations.Name; 026import org.apache.reef.wake.EStage; 027import org.apache.reef.wake.EventHandler; 028 029import java.util.List; 030import java.util.logging.Logger; 031 032/** 033 * 034 */ 035public class TopologyUpdateWaitHandler implements EventHandler<List<TaskNode>> { 036 037 private static final Logger LOG = Logger.getLogger(TopologyUpdateWaitHandler.class.getName()); 038 private final EStage<GroupCommunicationMessage> senderStage; 039 private final Class<? extends Name<String>> groupName; 040 private final Class<? extends Name<String>> operName; 041 private final String driverId; 042 private final int driverVersion; 043 private final String dstId; 044 private final int dstVersion; 045 private final String qualifiedName; 046 private final byte[] data; 047 048 049 /** 050 * The handler will wait for all nodes to acquire topoLock 051 * and send TopologySetup msg. Then it will send TopologyUpdated 052 * msg. However, any local topology changes are not in effect 053 * till driver sends TopologySetup once statusMap is emptied 054 * The operations in the tasks that have topology changes will 055 * wait for this. However other tasks that do not have any changes 056 * will continue their regular operation 057 */ 058 public TopologyUpdateWaitHandler(final EStage<GroupCommunicationMessage> senderStage, 059 final Class<? extends Name<String>> groupName, 060 final Class<? extends Name<String>> operName, 061 final String driverId, final int driverVersion, 062 final String dstId, final int dstVersion, 063 final String qualifiedName, final byte[] data) { 064 super(); 065 this.senderStage = senderStage; 066 this.groupName = groupName; 067 this.operName = operName; 068 this.driverId = driverId; 069 this.driverVersion = driverVersion; 070 this.dstId = dstId; 071 this.dstVersion = dstVersion; 072 this.qualifiedName = qualifiedName; 073 this.data = data; 074 } 075 076 077 @Override 078 public void onNext(final List<TaskNode> nodes) { 079 LOG.entering("TopologyUpdateWaitHandler", "onNext", new Object[]{qualifiedName, nodes}); 080 081 for (final TaskNode node : nodes) { 082 LOG.fine(qualifiedName + "Waiting for " + node + " to enter TopologyUdate phase"); 083 node.waitForTopologySetupOrFailure(); 084 if (node.isRunning()) { 085 LOG.fine(qualifiedName + node + " is in TopologyUpdate phase"); 086 } else { 087 LOG.fine(qualifiedName + node + " has failed"); 088 } 089 } 090 LOG.finest(qualifiedName + "NodeTopologyUpdateWaitStage All to be updated nodes " + "have received TopologySetup"); 091 LOG.fine(qualifiedName + "All affected parts of the topology are in TopologyUpdate phase. Will send a note to (" 092 + dstId + "," + dstVersion + ")"); 093 senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 094 ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, driverId, driverVersion, dstId, 095 dstVersion, data)); 096 LOG.exiting("TopologyUpdateWaitHandler", "onNext", qualifiedName); 097 } 098 099}