001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.driver.parameters.DriverIdentifier; 022import org.apache.reef.io.network.group.api.operators.GroupCommOperator; 023import org.apache.reef.io.network.group.api.GroupChanges; 024import org.apache.reef.io.network.group.api.config.OperatorSpec; 025import org.apache.reef.io.network.group.api.driver.TaskNode; 026import org.apache.reef.io.network.group.api.driver.Topology; 027import org.apache.reef.io.network.group.impl.GroupChangesCodec; 028import org.apache.reef.io.network.group.impl.GroupChangesImpl; 029import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 030import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec; 031import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec; 032import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec; 033import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec; 034import org.apache.reef.io.network.group.impl.config.parameters.*; 035import org.apache.reef.io.network.group.impl.operators.*; 036import org.apache.reef.io.network.group.impl.utils.Utils; 037import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 038import org.apache.reef.io.serialization.Codec; 039import org.apache.reef.tang.Configuration; 040import org.apache.reef.tang.JavaConfigurationBuilder; 041import org.apache.reef.tang.Tang; 042import org.apache.reef.tang.annotations.Name; 043import org.apache.reef.tang.annotations.Parameter; 044import org.apache.reef.tang.formats.AvroConfigurationSerializer; 045import org.apache.reef.tang.formats.ConfigurationSerializer; 046import org.apache.reef.wake.EStage; 047import org.apache.reef.wake.EventHandler; 048import org.apache.reef.wake.impl.SingleThreadStage; 049 050import javax.inject.Inject; 051import java.util.ArrayList; 052import java.util.List; 053import java.util.Map; 054import java.util.concurrent.ConcurrentMap; 055import java.util.concurrent.ConcurrentSkipListMap; 056import java.util.logging.Logger; 057 058/** 059 * Implements a tree topology with the specified Fan Out. 060 */ 061public class TreeTopology implements Topology { 062 063 private static final Logger LOG = Logger.getLogger(TreeTopology.class.getName()); 064 065 private final EStage<GroupCommunicationMessage> senderStage; 066 private final Class<? extends Name<String>> groupName; 067 private final Class<? extends Name<String>> operName; 068 private final String driverId; 069 private String rootId; 070 private OperatorSpec operatorSpec; 071 072 private TaskNode root; 073 private TaskNode logicalRoot; 074 private TaskNode prev; 075 private final int fanOut; 076 077 private final ConcurrentMap<String, TaskNode> nodes = new ConcurrentSkipListMap<>(); 078 private final ConfigurationSerializer confSer = new AvroConfigurationSerializer(); 079 080 /** 081 * @deprecated in 0.14. Use Tang to obtain an instance of this instead. 082 */ 083 @Deprecated 084 public TreeTopology(final EStage<GroupCommunicationMessage> senderStage, 085 final Class<? extends Name<String>> groupName, 086 final Class<? extends Name<String>> operatorName, 087 final String driverId, final int numberOfTasks, final int fanOut) { 088 this.senderStage = senderStage; 089 this.groupName = groupName; 090 this.operName = operatorName; 091 this.driverId = driverId; 092 this.fanOut = fanOut; 093 LOG.config(getQualifiedName() + "Tree Topology running with a fan-out of " + fanOut); 094 } 095 096 @Inject 097 private TreeTopology(@Parameter(GroupCommSenderStage.class) final EStage<GroupCommunicationMessage> senderStage, 098 @Parameter(CommGroupNameClass.class) final Class<? extends Name<String>> groupName, 099 @Parameter(OperatorNameClass.class) final Class<? extends Name<String>> operatorName, 100 @Parameter(DriverIdentifier.class) final String driverId, 101 @Parameter(TreeTopologyFanOut.class) final int fanOut) { 102 this.senderStage = senderStage; 103 this.groupName = groupName; 104 this.operName = operatorName; 105 this.driverId = driverId; 106 this.fanOut = fanOut; 107 LOG.config(getQualifiedName() + "Tree Topology running with a fan-out of " + fanOut); 108 } 109 110 @Override 111 @SuppressWarnings("checkstyle:hiddenfield") 112 public void setRootTask(final String rootId) { 113 LOG.entering("TreeTopology", "setRootTask", new Object[]{getQualifiedName(), rootId}); 114 this.rootId = rootId; 115 LOG.exiting("TreeTopology", "setRootTask", getQualifiedName() + rootId); 116 } 117 118 @Override 119 public String getRootId() { 120 LOG.entering("TreeTopology", "getRootId", getQualifiedName()); 121 LOG.exiting("TreeTopology", "getRootId", getQualifiedName() + rootId); 122 return rootId; 123 } 124 125 @Override 126 public boolean isRootPresent() { 127 LOG.entering("TreeTopology", "isRootPresent", getQualifiedName()); 128 final boolean retVal = root != null; 129 LOG.exiting("TreeTopology", "isRootPresent", String.format("%s%s", getQualifiedName(), retVal)); 130 return retVal; 131 } 132 133 @Override 134 public void setOperatorSpecification(final OperatorSpec spec) { 135 LOG.entering("TreeTopology", "setOperSpec", new Object[]{getQualifiedName(), spec}); 136 this.operatorSpec = spec; 137 LOG.exiting("TreeTopology", "setOperSpec", getQualifiedName() + spec); 138 } 139 140 @Override 141 public Configuration getTaskConfiguration(final String taskId) { 142 LOG.entering("TreeTopology", "getTaskConfig", new Object[]{getQualifiedName(), taskId}); 143 final TaskNode taskNode = nodes.get(taskId); 144 if (taskNode == null) { 145 throw new RuntimeException(getQualifiedName() + taskId + " does not exist"); 146 } 147 148 final int version = getNodeVersion(taskId); 149 final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); 150 jcb.bindNamedParameter(DataCodec.class, operatorSpec.getDataCodecClass()); 151 jcb.bindNamedParameter(TaskVersion.class, Integer.toString(version)); 152 if (operatorSpec instanceof BroadcastOperatorSpec) { 153 final BroadcastOperatorSpec broadcastOperatorSpec = (BroadcastOperatorSpec) operatorSpec; 154 if (taskId.equals(broadcastOperatorSpec.getSenderId())) { 155 jcb.bindImplementation(GroupCommOperator.class, BroadcastSender.class); 156 } else { 157 jcb.bindImplementation(GroupCommOperator.class, BroadcastReceiver.class); 158 } 159 } else if (operatorSpec instanceof ReduceOperatorSpec) { 160 final ReduceOperatorSpec reduceOperatorSpec = (ReduceOperatorSpec) operatorSpec; 161 jcb.bindNamedParameter(ReduceFunctionParam.class, reduceOperatorSpec.getRedFuncClass()); 162 if (taskId.equals(reduceOperatorSpec.getReceiverId())) { 163 jcb.bindImplementation(GroupCommOperator.class, ReduceReceiver.class); 164 } else { 165 jcb.bindImplementation(GroupCommOperator.class, ReduceSender.class); 166 } 167 } else if (operatorSpec instanceof ScatterOperatorSpec) { 168 final ScatterOperatorSpec scatterOperatorSpec = (ScatterOperatorSpec) operatorSpec; 169 if (taskId.equals(scatterOperatorSpec.getSenderId())) { 170 jcb.bindImplementation(GroupCommOperator.class, ScatterSender.class); 171 } else { 172 jcb.bindImplementation(GroupCommOperator.class, ScatterReceiver.class); 173 } 174 } else if (operatorSpec instanceof GatherOperatorSpec) { 175 final GatherOperatorSpec gatherOperatorSpec = (GatherOperatorSpec) operatorSpec; 176 if (taskId.equals(gatherOperatorSpec.getReceiverId())) { 177 jcb.bindImplementation(GroupCommOperator.class, GatherReceiver.class); 178 } else { 179 jcb.bindImplementation(GroupCommOperator.class, GatherSender.class); 180 } 181 } 182 final Configuration retConf = jcb.build(); 183 LOG.exiting("TreeTopology", "getTaskConfig", getQualifiedName() + confSer.toString(retConf)); 184 return retConf; 185 } 186 187 @Override 188 public int getNodeVersion(final String taskId) { 189 LOG.entering("TreeTopology", "getNodeVersion", new Object[]{getQualifiedName(), taskId}); 190 final TaskNode node = nodes.get(taskId); 191 if (node == null) { 192 throw new RuntimeException(getQualifiedName() + taskId + " is not available on the nodes map"); 193 } 194 final int version = node.getVersion(); 195 LOG.exiting("TreeTopology", "getNodeVersion", getQualifiedName() + " " + taskId + " " + version); 196 return version; 197 } 198 199 @Override 200 public void removeTask(final String taskId) { 201 LOG.entering("TreeTopology", "removeTask", new Object[]{getQualifiedName(), taskId}); 202 if (!nodes.containsKey(taskId)) { 203 LOG.fine("Trying to remove a non-existent node in the task graph"); 204 LOG.exiting("TreeTopology", "removeTask", getQualifiedName()); 205 return; 206 } 207 if (taskId.equals(rootId)) { 208 unsetRootNode(taskId); 209 } else { 210 removeChild(taskId); 211 } 212 LOG.exiting("TreeTopology", "removeTask", getQualifiedName() + taskId); 213 } 214 215 @Override 216 public void addTask(final String taskId) { 217 LOG.entering("TreeTopology", "addTask", new Object[]{getQualifiedName(), taskId}); 218 if (nodes.containsKey(taskId)) { 219 LOG.fine("Got a request to add a task that is already in the graph. " + 220 "We need to block this request till the delete finishes. ***CAUTION***"); 221 } 222 223 if (taskId.equals(rootId)) { 224 setRootNode(taskId); 225 } else { 226 addChild(taskId); 227 } 228 LOG.exiting("TreeTopology", "addTask", getQualifiedName() + taskId); 229 } 230 231 private void addChild(final String taskId) { 232 LOG.entering("TreeTopology", "addChild", new Object[]{getQualifiedName(), taskId}); 233 LOG.finest(getQualifiedName() + "Adding leaf " + taskId); 234 final TaskNode node = new TaskNodeImpl(senderStage, groupName, operName, taskId, driverId, false); 235 if (logicalRoot != null) { 236 addTaskNode(node); 237 prev = node; 238 } 239 nodes.put(taskId, node); 240 LOG.exiting("TreeTopology", "addChild", getQualifiedName() + taskId); 241 } 242 243 private void addTaskNode(final TaskNode node) { 244 LOG.entering("TreeTopology", "addTaskNode", new Object[]{getQualifiedName(), node}); 245 if (logicalRoot.getNumberOfChildren() >= this.fanOut) { 246 logicalRoot = logicalRoot.successor(); 247 } 248 node.setParent(logicalRoot); 249 logicalRoot.addChild(node); 250 prev.setSibling(node); 251 LOG.exiting("TreeTopology", "addTaskNode", getQualifiedName() + node); 252 } 253 254 private void removeChild(final String taskId) { 255 LOG.entering("TreeTopology", "removeChild", new Object[]{getQualifiedName(), taskId}); 256 if (root != null) { 257 root.removeChild(nodes.get(taskId)); 258 } 259 nodes.remove(taskId); 260 LOG.exiting("TreeTopology", "removeChild", getQualifiedName() + taskId); 261 } 262 263 private void setRootNode(final String newRootId) { 264 LOG.entering("TreeTopology", "setRootNode", new Object[]{getQualifiedName(), newRootId}); 265 this.root = new TaskNodeImpl(senderStage, groupName, operName, newRootId, driverId, true); 266 this.logicalRoot = this.root; 267 this.prev = this.root; 268 269 for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) { 270 final TaskNode leaf = nodeEntry.getValue(); 271 addTaskNode(leaf); 272 this.prev = leaf; 273 } 274 nodes.put(newRootId, root); 275 LOG.exiting("TreeTopology", "setRootNode", getQualifiedName() + newRootId); 276 } 277 278 private void unsetRootNode(final String taskId) { 279 LOG.entering("TreeTopology", "unsetRootNode", new Object[]{getQualifiedName(), taskId}); 280 nodes.remove(rootId); 281 root = null; 282 283 for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) { 284 final TaskNode leaf = nodeEntry.getValue(); 285 leaf.setParent(null); 286 } 287 LOG.exiting("TreeTopology", "unsetRootNode", getQualifiedName() + taskId); 288 } 289 290 @Override 291 public void onFailedTask(final String taskId) { 292 LOG.entering("TreeTopology", "onFailedTask", new Object[]{getQualifiedName(), taskId}); 293 final TaskNode taskNode = nodes.get(taskId); 294 if (taskNode == null) { 295 throw new RuntimeException(getQualifiedName() + taskId + " does not exist"); 296 } 297 taskNode.onFailedTask(); 298 LOG.exiting("TreeTopology", "onFailedTask", getQualifiedName() + taskId); 299 } 300 301 @Override 302 public void onRunningTask(final String taskId) { 303 LOG.entering("TreeTopology", "onRunningTask", new Object[]{getQualifiedName(), taskId}); 304 final TaskNode taskNode = nodes.get(taskId); 305 if (taskNode == null) { 306 throw new RuntimeException(getQualifiedName() + taskId + " does not exist"); 307 } 308 taskNode.onRunningTask(); 309 LOG.exiting("TreeTopology", "onRunningTask", getQualifiedName() + taskId); 310 } 311 312 @Override 313 public void onReceiptOfMessage(final GroupCommunicationMessage msg) { 314 LOG.entering("TreeTopology", "onReceiptOfMessage", new Object[]{getQualifiedName(), msg}); 315 switch (msg.getType()) { 316 case TopologyChanges: 317 onTopologyChanges(msg); 318 break; 319 case UpdateTopology: 320 onUpdateTopology(msg); 321 break; 322 323 default: 324 nodes.get(msg.getSrcid()).onReceiptOfAcknowledgement(msg); 325 break; 326 } 327 LOG.exiting("TreeTopology", "onReceiptOfMessage", getQualifiedName() + msg); 328 } 329 330 private void onUpdateTopology(final GroupCommunicationMessage msg) { 331 LOG.entering("TreeTopology", "onUpdateTopology", new Object[]{getQualifiedName(), msg}); 332 LOG.fine(getQualifiedName() + "Update affected parts of Topology"); 333 final String dstId = msg.getSrcid(); 334 final int version = getNodeVersion(dstId); 335 336 LOG.finest(getQualifiedName() + "Creating NodeTopologyUpdateWaitStage to wait on nodes to be updated"); 337 final EventHandler<List<TaskNode>> topoUpdateWaitHandler = new TopologyUpdateWaitHandler(senderStage, groupName, 338 operName, driverId, 0, 339 dstId, version, 340 getQualifiedName(), TopologySerializer.encode(root)); 341 final EStage<List<TaskNode>> nodeTopologyUpdateWaitStage = new SingleThreadStage<>("NodeTopologyUpdateWaitStage", 342 topoUpdateWaitHandler, 343 nodes.size()); 344 345 final List<TaskNode> toBeUpdatedNodes = new ArrayList<>(nodes.size()); 346 LOG.finest(getQualifiedName() + "Checking which nodes need to be updated"); 347 for (final TaskNode node : nodes.values()) { 348 if (node.isRunning() && node.hasChanges() && node.resetTopologySetupSent()) { 349 toBeUpdatedNodes.add(node); 350 } 351 } 352 for (final TaskNode node : toBeUpdatedNodes) { 353 node.updatingTopology(); 354 LOG.fine(getQualifiedName() + "Asking " + node + " to UpdateTopology"); 355 senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 356 ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, node.getTaskId(), 357 node.getVersion(), Utils.EMPTY_BYTE_ARR)); 358 } 359 nodeTopologyUpdateWaitStage.onNext(toBeUpdatedNodes); 360 LOG.exiting("TreeTopology", "onUpdateTopology", getQualifiedName() + msg); 361 } 362 363 private void onTopologyChanges(final GroupCommunicationMessage msg) { 364 LOG.entering("TreeTopology", "onTopologyChanges", new Object[]{getQualifiedName(), msg}); 365 LOG.fine(getQualifiedName() + "Check TopologyChanges"); 366 final String dstId = msg.getSrcid(); 367 boolean hasTopologyChanged = false; 368 LOG.finest(getQualifiedName() + "Checking which nodes need to be updated"); 369 for (final TaskNode node : nodes.values()) { 370 if (!node.isRunning() || node.hasChanges()) { 371 hasTopologyChanged = true; 372 break; 373 } 374 } 375 final GroupChanges changes = new GroupChangesImpl(hasTopologyChanged); 376 final Codec<GroupChanges> changesCodec = new GroupChangesCodec(); 377 LOG.fine(getQualifiedName() + "TopologyChanges: " + changes); 378 senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 379 ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, dstId, getNodeVersion(dstId), 380 changesCodec.encode(changes))); 381 LOG.exiting("TreeTopology", "onTopologyChanges", getQualifiedName() + msg); 382 } 383 384 private String getQualifiedName() { 385 return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + " - "; 386 } 387}