001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.task; 020 021import org.apache.reef.exception.evaluator.NetworkException; 022import org.apache.reef.io.network.exception.ParentDeadException; 023import org.apache.reef.io.network.group.api.operators.Reduce; 024import org.apache.reef.io.network.group.api.task.OperatorTopology; 025import org.apache.reef.io.network.group.api.task.OperatorTopologyStruct; 026import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 027import org.apache.reef.io.network.group.impl.operators.Sender; 028import org.apache.reef.io.network.group.impl.utils.ResettingCountDownLatch; 029import org.apache.reef.io.network.group.impl.utils.Utils; 030import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 031import org.apache.reef.io.serialization.Codec; 032import org.apache.reef.tang.annotations.Name; 033import org.apache.reef.wake.EStage; 034import org.apache.reef.wake.EventHandler; 035import org.apache.reef.wake.impl.SingleThreadStage; 036 037import java.util.Arrays; 038import java.util.HashSet; 039import java.util.Map; 040import java.util.Set; 041import java.util.concurrent.BlockingQueue; 042import java.util.concurrent.LinkedBlockingQueue; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.logging.Logger; 045 046public class OperatorTopologyImpl implements OperatorTopology { 047 048 private static final Logger LOG = Logger.getLogger(OperatorTopologyImpl.class.getName()); 049 050 private final Class<? extends Name<String>> groupName; 051 private final Class<? extends Name<String>> operName; 052 private final String selfId; 053 private final String driverId; 054 private final Sender sender; 055 private final Object topologyLock = new Object(); 056 057 private final int version; 058 059 private final BlockingQueue<GroupCommunicationMessage> deltas = new LinkedBlockingQueue<>(); 060 private final BlockingQueue<GroupCommunicationMessage> deletionDeltas = new LinkedBlockingQueue<>(); 061 062 private OperatorTopologyStruct baseTopology; 063 private OperatorTopologyStruct effectiveTopology; 064 private final ResettingCountDownLatch topologyLockAcquired = new ResettingCountDownLatch(1); 065 private final AtomicBoolean updatingTopo = new AtomicBoolean(false); 066 067 private final EventHandler<GroupCommunicationMessage> baseTopologyUpdateHandler = new BaseTopologyUpdateHandler(); 068 069 private final EStage<GroupCommunicationMessage> baseTopologyUpdateStage = new SingleThreadStage<>( 070 "BaseTopologyUpdateStage", 071 baseTopologyUpdateHandler, 072 5); 073 074 private final EventHandler<GroupCommunicationMessage> dataHandlingStageHandler = new DataHandlingStageHandler(); 075 076 // The queue capacity might determine how many tasks can be handled 077 private final EStage<GroupCommunicationMessage> dataHandlingStage = new SingleThreadStage<>("DataHandlingStage", 078 dataHandlingStageHandler, 079 10000); 080 081 public OperatorTopologyImpl(final Class<? extends Name<String>> groupName, 082 final Class<? extends Name<String>> operName, final String selfId, 083 final String driverId, final Sender sender, final int version) { 084 super(); 085 this.groupName = groupName; 086 this.operName = operName; 087 this.selfId = selfId; 088 this.driverId = driverId; 089 this.sender = sender; 090 this.version = version; 091 } 092 093 /** 094 * Handle messages meant for this operator. Data msgs are passed on 095 * to the DataHandlingStage while Ctrl msgs are queued up for the 096 * base topology to update later. Ctrl msgs signalling death of a 097 * task are also routed to the effectiveTopology in order to notify 098 * a waiting operation. During initialization when effective topology 099 * is not yet set-up, these *Dead msgs are queued in deletionDeltas 100 * for the small time window when these arrive after baseTopology has 101 * received TopologySetup but not yet created the effectiveTopology. 102 * Most times the msgs in the deletionDeltas will be discarded as stale 103 * msgs 104 * <p> 105 * No synchronization is needed while handling *Dead messages. 106 * There 2 states: UpdatingTopo and NotUpdatingTopo 107 * If UpdatingTopo, deltas.put still takes care of adding this msg to effTop through baseTopo changes. 108 * If not, we add to effTopo. So we are good. 109 * <p> 110 * However, for data msgs synchronization is needed. Look at doc of 111 * DataHandlingStage 112 * <p> 113 * Adding to deletionDeltas should be outside 114 * effTopo!=null block. There is a rare possibility that during initialization 115 * just after baseTopo is created(so deltas will be ignored) and just before 116 * effTopo is created(so effTopo will be null) where we can miss a deletion 117 * msg if not added to deletionDelta because this method is synchronized 118 */ 119 @Override 120 public void handle(final GroupCommunicationMessage msg) { 121 LOG.entering("OperatorTopologyImpl", "handle", new Object[]{getQualifiedName(), msg}); 122 if (isMsgVersionOk(msg)) { 123 try { 124 switch (msg.getType()) { 125 case UpdateTopology: 126 updatingTopo.set(true); 127 baseTopologyUpdateStage.onNext(msg); 128 topologyLockAcquired.awaitAndReset(1); 129 LOG.finest(getQualifiedName() + "topoLockAcquired CDL released. Resetting it to new CDL"); 130 sendAckToDriver(msg); 131 break; 132 133 case TopologySetup: 134 LOG.finest(getQualifiedName() + "Adding to deltas queue"); 135 deltas.put(msg); 136 break; 137 138 case ParentAdd: 139 case ChildAdd: 140 LOG.finest(getQualifiedName() + "Adding to deltas queue"); 141 deltas.put(msg); 142 break; 143 144 case ParentDead: 145 case ChildDead: 146 LOG.finest(getQualifiedName() + "Adding to deltas queue"); 147 deltas.put(msg); 148 149 LOG.finest(getQualifiedName() + "Adding to deletionDeltas queue"); 150 deletionDeltas.put(msg); 151 152 if (effectiveTopology != null) { 153 LOG.finest(getQualifiedName() + "Adding as data msg to non-null effective topology struct"); 154 effectiveTopology.addAsData(msg); 155 } else { 156 LOG.fine(getQualifiedName() + "Received a death message before effective topology was setup. CAUTION"); 157 } 158 break; 159 160 default: 161 dataHandlingStage.onNext(msg); 162 } 163 } catch (final InterruptedException e) { 164 throw new RuntimeException("InterruptedException while trying to put ctrl msg into delta queue", e); 165 } 166 } 167 LOG.exiting("OperatorTopologyImpl", "handle", Arrays.toString(new Object[]{getQualifiedName(), msg})); 168 } 169 170 private boolean isMsgVersionOk(final GroupCommunicationMessage msg) { 171 LOG.entering("OperatorTopologyImpl", "isMsgVersionOk", new Object[]{getQualifiedName(), msg}); 172 if (msg.hasVersion()) { 173 final int msgVersion = msg.getVersion(); 174 final boolean retVal; 175 if (msgVersion < version) { 176 LOG.warning(getQualifiedName() + "Received a ver-" + msgVersion + " msg while expecting ver-" + version 177 + ". Discarding msg"); 178 retVal = false; 179 } else { 180 retVal = true; 181 } 182 LOG.exiting("OperatorTopologyImpl", "isMsgVersionOk", 183 Arrays.toString(new Object[]{retVal, getQualifiedName(), msg})); 184 return retVal; 185 } else { 186 throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs"); 187 } 188 } 189 190 @Override 191 public void initialize() throws ParentDeadException { 192 LOG.entering("OperatorTopologyImpl", "initialize", getQualifiedName()); 193 createBaseTopology(); 194 LOG.exiting("OperatorTopologyImpl", "initialize", getQualifiedName()); 195 } 196 197 @Override 198 public void sendToParent(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) 199 throws ParentDeadException { 200 LOG.entering("OperatorTopologyImpl", "sendToParent", new Object[] {getQualifiedName(), msgType}); 201 refreshEffectiveTopology(); 202 assert effectiveTopology != null; 203 effectiveTopology.sendToParent(data, msgType); 204 LOG.exiting("OperatorTopologyImpl", "sendToParent", getQualifiedName()); 205 } 206 207 @Override 208 public void sendToChildren(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) 209 throws ParentDeadException { 210 LOG.entering("OperatorTopologyImpl", "sendToChildren", new Object[]{getQualifiedName(), msgType}); 211 refreshEffectiveTopology(); 212 assert effectiveTopology != null; 213 effectiveTopology.sendToChildren(data, msgType); 214 LOG.exiting("OperatorTopologyImpl", "sendToChildren", getQualifiedName()); 215 } 216 217 @Override 218 public void sendToChildren(final Map<String, byte[]> dataMap, 219 final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) 220 throws ParentDeadException { 221 LOG.entering("OperatorTopologyImpl", "sendToChildren", new Object[]{getQualifiedName(), msgType}); 222 refreshEffectiveTopology(); 223 assert effectiveTopology != null; 224 effectiveTopology.sendToChildren(dataMap, msgType); 225 LOG.exiting("OperatorTopologyImpl", "sendToChildren", getQualifiedName()); 226 } 227 228 @Override 229 public byte[] recvFromParent(final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) 230 throws ParentDeadException { 231 LOG.entering("OperatorTopologyImpl", "recvFromParent", new Object[] {getQualifiedName(), msgType}); 232 refreshEffectiveTopology(); 233 assert effectiveTopology != null; 234 final byte[] retVal = effectiveTopology.recvFromParent(msgType); 235 LOG.exiting("OperatorTopologyImpl", "recvFromParent", getQualifiedName()); 236 return retVal; 237 } 238 239 @Override 240 public <T> T recvFromChildren(final Reduce.ReduceFunction<T> redFunc, final Codec<T> dataCodec) 241 throws ParentDeadException { 242 LOG.entering("OperatorTopologyImpl", "recvFromChildren", getQualifiedName()); 243 refreshEffectiveTopology(); 244 assert effectiveTopology != null; 245 final T retVal = effectiveTopology.recvFromChildren(redFunc, dataCodec); 246 LOG.exiting("OperatorTopologyImpl", "recvFromChildren", getQualifiedName()); 247 return retVal; 248 } 249 250 @Override 251 public byte[] recvFromChildren() throws ParentDeadException { 252 LOG.entering("OperatorTopologyImpl", "recvFromChildren", getQualifiedName()); 253 refreshEffectiveTopology(); 254 assert effectiveTopology != null; 255 final byte[] retVal = effectiveTopology.recvFromChildren(); 256 LOG.exiting("OperatorTopologyImpl", "recvFromChildren", getQualifiedName()); 257 return retVal; 258 } 259 260 /** 261 * Only refreshes the effective topology with deletion msgs from. 262 * deletionDeltas queue 263 * 264 * @throws ParentDeadException 265 */ 266 private void refreshEffectiveTopology() throws ParentDeadException { 267 LOG.entering("OperatorTopologyImpl", "refreshEffectiveTopology", getQualifiedName()); 268 LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); 269 synchronized (topologyLock) { 270 LOG.finest(getQualifiedName() + "Acquired topoLock"); 271 272 assert effectiveTopology != null; 273 274 final Set<GroupCommunicationMessage> deletionDeltasSet = new HashSet<>(); 275 copyDeletionDeltas(deletionDeltasSet); 276 277 LOG.finest(getQualifiedName() + "Updating effective topology struct with deletion msgs"); 278 effectiveTopology.update(deletionDeltasSet); 279 LOG.finest(getQualifiedName() + "Released topoLock"); 280 } 281 LOG.exiting("OperatorTopologyImpl", "refreshEffectiveTopology", getQualifiedName()); 282 } 283 284 /** 285 * @throws ParentDeadException 286 */ 287 private void createBaseTopology() throws ParentDeadException { 288 LOG.entering("OperatorTopologyImpl", "createBaseTopology", getQualifiedName()); 289 baseTopology = new OperatorTopologyStructImpl(groupName, operName, selfId, driverId, sender, version); 290 updateBaseTopology(); 291 LOG.exiting("OperatorTopologyImpl", "createBaseTopology", getQualifiedName()); 292 } 293 294 /** 295 * Blocking method that waits till the base topology is updated Unblocks when. 296 * we receive a TopologySetup msg from driver 297 * <p> 298 * Will also update the effective topology when the base topology is updated 299 * so that creation of effective topology is limited to just this method and 300 * refresh will only refresh the effective topology with deletion msgs from 301 * deletionDeltas queue 302 * 303 * @throws ParentDeadException 304 */ 305 private void updateBaseTopology() throws ParentDeadException { 306 LOG.entering("OperatorTopologyImpl", "updateBaseTopology", getQualifiedName()); 307 LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); 308 synchronized (topologyLock) { 309 LOG.finest(getQualifiedName() + "Acquired topoLock"); 310 try { 311 assert baseTopology != null; 312 LOG.finest(getQualifiedName() + "Updating base topology. So setting dirty bit"); 313 baseTopology.setChanges(true); 314 315 LOG.finest(getQualifiedName() + "Waiting for ctrl msgs"); 316 for (GroupCommunicationMessage msg = deltas.take(); 317 msg.getType() != ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup; 318 msg = deltas.take()) { 319 LOG.finest(getQualifiedName() + "Got " + msg.getType() + " msg from " + msg.getSrcid()); 320 if (effectiveTopology == null && 321 msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) { 322 /** 323 * If effectiveTopology!=null, this method is being called from the BaseTopologyUpdateStage 324 * And exception thrown will be caught by uncaughtExceptionHandler leading to System.exit 325 */ 326 LOG.finer(getQualifiedName() + "Throwing ParentDeadException"); 327 throw new ParentDeadException(getQualifiedName() 328 + "Parent dead. Current behavior is for the child to die too."); 329 } else { 330 LOG.finest(getQualifiedName() + "Updating baseTopology struct"); 331 baseTopology.update(msg); 332 sendAckToDriver(msg); 333 } 334 LOG.finest(getQualifiedName() + "Waiting for ctrl msgs"); 335 } 336 337 updateEffTopologyFromBaseTopology(); 338 339 } catch (final InterruptedException e) { 340 throw new RuntimeException("InterruptedException while waiting for delta msg from driver", e); 341 } 342 LOG.finest(getQualifiedName() + "Released topoLock"); 343 } 344 LOG.exiting("OperatorTopologyImpl", "updateBaseTopology", getQualifiedName()); 345 } 346 347 private void sendAckToDriver(final GroupCommunicationMessage msg) { 348 LOG.entering("OperatorTopologyImpl", "sendAckToDriver", new Object[]{getQualifiedName(), msg}); 349 try { 350 final String srcId = msg.getSrcid(); 351 if (msg.hasVersion()) { 352 final int srcVersion = msg.getSrcVersion(); 353 switch (msg.getType()) { 354 case UpdateTopology: 355 sender.send(Utils.bldVersionedGCM(groupName, operName, 356 ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, selfId, this.version, driverId, 357 srcVersion, Utils.EMPTY_BYTE_ARR)); 358 break; 359 case ParentAdd: 360 sender.send(Utils.bldVersionedGCM(groupName, operName, 361 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdded, selfId, this.version, srcId, 362 srcVersion, Utils.EMPTY_BYTE_ARR), driverId); 363 break; 364 case ParentDead: 365 sender.send(Utils.bldVersionedGCM(groupName, operName, 366 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentRemoved, selfId, this.version, srcId, 367 srcVersion, Utils.EMPTY_BYTE_ARR), driverId); 368 break; 369 case ChildAdd: 370 sender.send(Utils.bldVersionedGCM(groupName, operName, 371 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdded, selfId, this.version, srcId, 372 srcVersion, Utils.EMPTY_BYTE_ARR), driverId); 373 break; 374 case ChildDead: 375 sender.send(Utils.bldVersionedGCM(groupName, operName, 376 ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildRemoved, selfId, this.version, srcId, 377 srcVersion, Utils.EMPTY_BYTE_ARR), driverId); 378 break; 379 default: 380 throw new RuntimeException("Received a non control message for acknowledgement"); 381 } 382 } else { 383 throw new RuntimeException(getQualifiedName() + "Ack Sender can only deal with versioned msgs"); 384 } 385 } catch (final NetworkException e) { 386 throw new RuntimeException("NetworkException while sending ack to driver for delta msg " + msg.getType(), e); 387 } 388 LOG.exiting("OperatorTopologyImpl", "sendAckToDriver", Arrays.toString(new Object[]{getQualifiedName(), msg})); 389 } 390 391 private void updateEffTopologyFromBaseTopology() { 392 LOG.entering("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", getQualifiedName()); 393 assert baseTopology != null; 394 LOG.finest(getQualifiedName() + "Updating effective topology"); 395 if (baseTopology.hasChanges()) { 396 //Create effectiveTopology from baseTopology 397 effectiveTopology = new OperatorTopologyStructImpl(baseTopology); 398 baseTopology.setChanges(false); 399 } 400 LOG.exiting("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", getQualifiedName()); 401 } 402 403 /** 404 * @param deletionDeltasForUpdate 405 * @throws ParentDeadException 406 */ 407 private void copyDeletionDeltas(final Set<GroupCommunicationMessage> deletionDeltasForUpdate) 408 throws ParentDeadException { 409 LOG.entering("OperatorTopologyImpl", "copyDeletionDeltas", getQualifiedName()); 410 this.deletionDeltas.drainTo(deletionDeltasForUpdate); 411 for (final GroupCommunicationMessage msg : deletionDeltasForUpdate) { 412 final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType = msg.getType(); 413 if (msgType == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) { 414 throw new ParentDeadException(getQualifiedName() + 415 "Parent dead. Current behavior is for the child to die too."); 416 } 417 } 418 LOG.exiting("OperatorTopologyImpl", "copyDeletionDeltas", getQualifiedName()); 419 } 420 421 private String getQualifiedName() { 422 return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + selfId + ":ver(" + version + ") - "; 423 } 424 425 /** 426 * Unlike Dead msgs this needs to be synchronized because data msgs are not 427 * routed through the base topo changes So we need to make sure to wait for 428 * updateTopo to complete and for the new effective topo to take effect. Hence 429 * updatingTopo is set to false in refreshEffTopo. Also, since this is called 430 * from a netty IO thread, we need to create a stage to move the msgs from 431 * netty space to application space and release the netty threads. Otherwise 432 * weird deadlocks can happen Ex: Sent model to k nodes using broadcast. Send 433 * to K+1 th is waiting for ACK. The K nodes already compute their states and 434 * reduce send their results. If we haven't finished refreshEffTopo because of 435 * which updatingTopo is true, we can't add the new msgs if the #netty threads 436 * is k All k threads are waiting to add data. Single user thread that is 437 * waiting for ACK does not come around to refreshEffTopo and we are 438 * deadlocked because there aren't enough netty threads to dispatch msgs to 439 * the application. Hence the stage 440 */ 441 private final class DataHandlingStageHandler implements EventHandler<GroupCommunicationMessage> { 442 @Override 443 public void onNext(final GroupCommunicationMessage dataMsg) { 444 LOG.entering("OperatorTopologyImpl.DataHandlingStageHandler", "onNext", new Object[]{getQualifiedName(), 445 dataMsg}); 446 LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); 447 synchronized (topologyLock) { 448 LOG.finest(getQualifiedName() + "Acquired topoLock"); 449 while (updatingTopo.get()) { 450 try { 451 LOG.finest(getQualifiedName() + "Topology is being updated. Released topoLock, Waiting on topoLock"); 452 topologyLock.wait(); 453 LOG.finest(getQualifiedName() + "Acquired topoLock"); 454 } catch (final InterruptedException e) { 455 throw new RuntimeException("InterruptedException while data handling" 456 + "stage was waiting for updatingTopo to become false", e); 457 } 458 } 459 if (effectiveTopology != null) { 460 LOG.finest(getQualifiedName() + "Non-null effectiveTopo.addAsData(msg)"); 461 effectiveTopology.addAsData(dataMsg); 462 } else { 463 LOG.fine("Received a data message before effective topology was setup"); 464 } 465 LOG.finest(getQualifiedName() + "Released topoLock"); 466 } 467 LOG.exiting("OperatorTopologyImpl.DataHandlingStageHandler", "onNext", 468 Arrays.toString(new Object[]{getQualifiedName(), dataMsg})); 469 } 470 } 471 472 private final class BaseTopologyUpdateHandler implements EventHandler<GroupCommunicationMessage> { 473 @Override 474 public void onNext(final GroupCommunicationMessage msg) { 475 assert msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology; 476 assert effectiveTopology != null; 477 LOG.entering("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext", new Object[]{getQualifiedName(), msg}); 478 LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); 479 synchronized (topologyLock) { 480 LOG.finest(getQualifiedName() + "Acquired topoLock"); 481 LOG.finest(getQualifiedName() + "Releasing topoLockAcquired CDL"); 482 topologyLockAcquired.countDown(); 483 try { 484 updateBaseTopology(); 485 LOG.finest(getQualifiedName() + "Completed updating base & effective topologies"); 486 } catch (final ParentDeadException e) { 487 throw new RuntimeException(getQualifiedName() + "BaseTopologyUpdateStage: Unexpected ParentDeadException", e); 488 } 489 updatingTopo.set(false); 490 LOG.finest(getQualifiedName() + "Topology update complete. Notifying waiting threads"); 491 topologyLock.notifyAll(); 492 LOG.finest(getQualifiedName() + "Released topoLock"); 493 } 494 LOG.exiting("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext", 495 Arrays.toString(new Object[]{getQualifiedName(), msg})); 496 } 497 } 498}