001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.evaluator.FailedEvaluator; 024import org.apache.reef.driver.parameters.DriverIdentifier; 025import org.apache.reef.driver.task.FailedTask; 026import org.apache.reef.driver.task.RunningTask; 027import org.apache.reef.driver.task.TaskConfigurationOptions; 028import org.apache.reef.io.network.group.api.config.OperatorSpec; 029import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver; 030import org.apache.reef.io.network.group.api.driver.Topology; 031import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 032import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec; 033import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec; 034import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec; 035import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec; 036import org.apache.reef.io.network.group.impl.config.parameters.*; 037import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler; 038import org.apache.reef.io.network.group.impl.utils.CountingSemaphore; 039import org.apache.reef.io.network.group.impl.utils.SetMap; 040import org.apache.reef.io.network.group.impl.utils.Utils; 041import org.apache.reef.tang.Configuration; 042import org.apache.reef.tang.Injector; 043import org.apache.reef.tang.JavaConfigurationBuilder; 044import org.apache.reef.tang.Tang; 045import org.apache.reef.tang.annotations.Name; 046import org.apache.reef.tang.annotations.Parameter; 047import org.apache.reef.tang.exceptions.InjectionException; 048import org.apache.reef.tang.formats.ConfigurationSerializer; 049import org.apache.reef.wake.EStage; 050import org.apache.reef.wake.impl.SingleThreadStage; 051 052import javax.inject.Inject; 053import java.util.*; 054import java.util.concurrent.ConcurrentHashMap; 055import java.util.concurrent.ConcurrentMap; 056import java.util.concurrent.atomic.AtomicBoolean; 057import java.util.logging.Level; 058import java.util.logging.Logger; 059 060@DriverSide 061@Private 062public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { 063 064 private static final Logger LOG = Logger.getLogger(CommunicationGroupDriverImpl.class.getName()); 065 066 private final Class<? extends Name<String>> groupName; 067 private final ConcurrentMap<Class<? extends Name<String>>, OperatorSpec> operatorSpecs = new ConcurrentHashMap<>(); 068 private final ConcurrentMap<Class<? extends Name<String>>, Topology> topologies = new ConcurrentHashMap<>(); 069 private final Map<String, TaskState> perTaskState = new HashMap<>(); 070 private boolean finalised = false; 071 private final ConfigurationSerializer confSerializer; 072 private final String driverId; 073 074 private final CountingSemaphore allInitialTasksRunning; 075 076 private final Object topologiesLock = new Object(); 077 private final Object configLock = new Object(); 078 private final AtomicBoolean initializing = new AtomicBoolean(true); 079 080 private final Object yetToRunLock = new Object(); 081 private final Object toBeRemovedLock = new Object(); 082 083 private final SetMap<MsgKey, IndexedMsg> msgQue = new SetMap<>(); 084 085 private final TopologyFactory topologyFactory; 086 private final Class<? extends Topology> topologyClass; 087 088 /** 089 * @deprecated in 0.14. Use Tang to obtain an instance of this instead. 090 */ 091 @Deprecated 092 public CommunicationGroupDriverImpl(final Class<? extends Name<String>> groupName, 093 final ConfigurationSerializer confSerializer, 094 final EStage<GroupCommunicationMessage> senderStage, 095 final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler, 096 final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler, 097 final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler, 098 final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler, 099 final String driverId, final int numberOfTasks, final int fanOut) { 100 super(); 101 this.groupName = groupName; 102 this.driverId = driverId; 103 this.confSerializer = confSerializer; 104 this.allInitialTasksRunning = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock); 105 106 groupCommRunningTaskHandler.addHandler(new TopologyRunningTaskHandler(this)); 107 groupCommFailedTaskHandler.addHandler(new TopologyFailedTaskHandler(this)); 108 groupCommFailedEvaluatorHandler.addHandler(new TopologyFailedEvaluatorHandler(this)); 109 commGroupMessageHandler.addHandler(new TopologyMessageHandler(this)); 110 final Injector injector = Tang.Factory.getTang().newInjector(); 111 injector.bindVolatileParameter(CommGroupNameClass.class, groupName); 112 injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage); 113 injector.bindVolatileParameter(DriverIdentifier.class, driverId); 114 injector.bindVolatileParameter(CommGroupNumTask.class, numberOfTasks); 115 injector.bindVolatileParameter(TreeTopologyFanOut.class, fanOut); 116 try { 117 topologyFactory = injector.getInstance(TopologyFactory.class); 118 } catch (final InjectionException e) { 119 throw new RuntimeException(e); 120 } 121 this.topologyClass = TreeTopology.class; 122 } 123 124 @Inject 125 private CommunicationGroupDriverImpl( 126 @Parameter(CommGroupNameClass.class) final Class<? extends Name<String>> groupName, 127 final ConfigurationSerializer confSerializer, 128 @Parameter(GroupCommRunningTaskHandler.class) 129 final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler, 130 @Parameter(GroupCommFailedTaskHandler.class) 131 final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler, 132 @Parameter(GroupCommFailedEvalHandler.class) 133 final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler, 134 final GroupCommMessageHandler groupCommMessageHandler, 135 @Parameter(DriverIdentifier.class) final String driverId, 136 @Parameter(CommGroupNumTask.class) final int numberOfTasks, 137 final TopologyFactory topologyFactory, 138 @Parameter(TopologyClass.class) final Class<? extends Topology> topologyClass) { 139 super(); 140 this.groupName = groupName; 141 this.driverId = driverId; 142 this.confSerializer = confSerializer; 143 this.allInitialTasksRunning = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock); 144 145 registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler, 146 groupCommFailedEvaluatorHandler, groupCommMessageHandler); 147 this.topologyFactory = topologyFactory; 148 this.topologyClass = topologyClass; 149 } 150 151 private void registerHandlers( 152 final BroadcastingEventHandler<RunningTask> runningTaskHandler, 153 final BroadcastingEventHandler<FailedTask> failedTaskHandler, 154 final BroadcastingEventHandler<FailedEvaluator> failedEvaluatorHandler, 155 final GroupCommMessageHandler groupCommMessageHandler) { 156 runningTaskHandler.addHandler(new TopologyRunningTaskHandler(this)); 157 failedTaskHandler.addHandler(new TopologyFailedTaskHandler(this)); 158 failedEvaluatorHandler.addHandler(new TopologyFailedEvaluatorHandler(this)); 159 groupCommMessageHandler.addHandler(groupName, new SingleThreadStage<>(new TopologyMessageHandler(this), 100 * 100)); 160 } 161 162 @Override 163 public CommunicationGroupDriver addBroadcast(final Class<? extends Name<String>> operatorName, 164 final BroadcastOperatorSpec spec) { 165 LOG.entering("CommunicationGroupDriverImpl", "addBroadcast", 166 new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}); 167 if (finalised) { 168 throw new IllegalStateException("Can't add more operators to a finalised spec"); 169 } 170 operatorSpecs.put(operatorName, spec); 171 172 final Topology topology; 173 try { 174 topology = topologyFactory.getNewInstance(operatorName, topologyClass); 175 } catch (final InjectionException e) { 176 LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName); 177 throw new RuntimeException(e); 178 } 179 180 topology.setRootTask(spec.getSenderId()); 181 topology.setOperatorSpecification(spec); 182 topologies.put(operatorName, topology); 183 LOG.exiting("CommunicationGroupDriverImpl", "addBroadcast", 184 Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), " added"})); 185 return this; 186 } 187 188 @Override 189 public CommunicationGroupDriver addReduce(final Class<? extends Name<String>> operatorName, 190 final ReduceOperatorSpec spec) { 191 LOG.entering("CommunicationGroupDriverImpl", "addReduce", 192 new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}); 193 if (finalised) { 194 throw new IllegalStateException("Can't add more operators to a finalised spec"); 195 } 196 LOG.finer(getQualifiedName() + "Adding reduce operator to tree topology: " + spec); 197 operatorSpecs.put(operatorName, spec); 198 199 final Topology topology; 200 try { 201 topology = topologyFactory.getNewInstance(operatorName, topologyClass); 202 } catch (final InjectionException e) { 203 LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName); 204 throw new RuntimeException(e); 205 } 206 207 topology.setRootTask(spec.getReceiverId()); 208 topology.setOperatorSpecification(spec); 209 topologies.put(operatorName, topology); 210 LOG.exiting("CommunicationGroupDriverImpl", "addReduce", 211 Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), " added"})); 212 return this; 213 } 214 215 @Override 216 public CommunicationGroupDriver addScatter(final Class<? extends Name<String>> operatorName, 217 final ScatterOperatorSpec spec) { 218 LOG.entering("CommunicationGroupDriverImpl", "addScatter", 219 new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}); 220 if (finalised) { 221 throw new IllegalStateException("Can't add more operators to a finalised spec"); 222 } 223 operatorSpecs.put(operatorName, spec); 224 225 final Topology topology; 226 try { 227 topology = topologyFactory.getNewInstance(operatorName, topologyClass); 228 } catch (final InjectionException e) { 229 LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName); 230 throw new RuntimeException(e); 231 } 232 233 topology.setRootTask(spec.getSenderId()); 234 topology.setOperatorSpecification(spec); 235 topologies.put(operatorName, topology); 236 LOG.exiting("CommunicationGroupDriverImpl", "addScatter", 237 Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec})); 238 return this; 239 } 240 241 @Override 242 public CommunicationGroupDriver addGather(final Class<? extends Name<String>> operatorName, 243 final GatherOperatorSpec spec) { 244 LOG.entering("CommunicationGroupDriverImpl", "addGather", 245 new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}); 246 if (finalised) { 247 throw new IllegalStateException("Can't add more operators to a finalised spec"); 248 } 249 operatorSpecs.put(operatorName, spec); 250 251 final Topology topology; 252 try { 253 topology = topologyFactory.getNewInstance(operatorName, topologyClass); 254 } catch (final InjectionException e) { 255 LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName); 256 throw new RuntimeException(e); 257 } 258 259 topology.setRootTask(spec.getReceiverId()); 260 topology.setOperatorSpecification(spec); 261 topologies.put(operatorName, topology); 262 LOG.exiting("CommunicationGroupDriverImpl", "addGather", 263 Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec})); 264 return this; 265 } 266 267 @Override 268 public Configuration getTaskConfiguration(final Configuration taskConf) { 269 LOG.entering("CommunicationGroupDriverImpl", "getTaskConfiguration", 270 new Object[]{getQualifiedName(), confSerializer.toString(taskConf)}); 271 final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); 272 final String taskId = taskId(taskConf); 273 if (perTaskState.containsKey(taskId)) { 274 jcb.bindNamedParameter(CommunicationGroupName.class, groupName.getName()); 275 jcb.bindNamedParameter(DriverIdentifierGroupComm.class, driverId); 276 LOG.finest(getQualifiedName() + "Task has been added. Waiting to acquire configLock"); 277 synchronized (configLock) { 278 LOG.finest(getQualifiedName() + "Acquired configLock"); 279 while (cantGetConfig(taskId)) { 280 LOG.finest(getQualifiedName() + "Need to wait for failure"); 281 try { 282 configLock.wait(); 283 } catch (final InterruptedException e) { 284 throw new RuntimeException(getQualifiedName() + "InterruptedException while waiting on configLock", e); 285 } 286 } 287 LOG.finest(getQualifiedName() + taskId + " - Will fetch configuration now."); 288 LOG.finest(getQualifiedName() + "Released configLock. Waiting to acquire topologiesLock"); 289 } 290 synchronized (topologiesLock) { 291 LOG.finest(getQualifiedName() + "Acquired topologiesLock"); 292 for (final Map.Entry<Class<? extends Name<String>>, OperatorSpec> operSpecEntry : operatorSpecs.entrySet()) { 293 final Class<? extends Name<String>> operName = operSpecEntry.getKey(); 294 final Topology topology = topologies.get(operName); 295 final JavaConfigurationBuilder jcbInner = Tang.Factory.getTang() 296 .newConfigurationBuilder(topology.getTaskConfiguration(taskId)); 297 jcbInner.bindNamedParameter(OperatorName.class, operName.getName()); 298 jcb.bindSetEntry(SerializedOperConfigs.class, confSerializer.toString(jcbInner.build())); 299 } 300 LOG.finest(getQualifiedName() + "Released topologiesLock"); 301 } 302 } else { 303 return null; 304 } 305 final Configuration configuration = jcb.build(); 306 LOG.exiting("CommunicationGroupDriverImpl", "getTaskConfiguration", 307 Arrays.toString(new Object[]{getQualifiedName(), confSerializer.toString(configuration)})); 308 return configuration; 309 } 310 311 private boolean cantGetConfig(final String taskId) { 312 LOG.entering("CommunicationGroupDriverImpl", "cantGetConfig", new Object[]{getQualifiedName(), taskId}); 313 final TaskState taskState = perTaskState.get(taskId); 314 if (!taskState.equals(TaskState.NOT_STARTED)) { 315 LOG.finest(getQualifiedName() + taskId + " has started."); 316 if (taskState.equals(TaskState.RUNNING)) { 317 LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", 318 Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is running. We can't get config"})); 319 return true; 320 } else { 321 LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", 322 Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has failed. We can get config"})); 323 return false; 324 } 325 } else { 326 LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", 327 Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has not started. We can get config"})); 328 return false; 329 } 330 } 331 332 @Override 333 public void finalise() { 334 finalised = true; 335 } 336 337 @Override 338 public void addTask(final Configuration partialTaskConf) { 339 LOG.entering("CommunicationGroupDriverImpl", "addTask", 340 new Object[]{getQualifiedName(), confSerializer.toString(partialTaskConf)}); 341 final String taskId = taskId(partialTaskConf); 342 LOG.finest(getQualifiedName() + "AddTask(" + taskId + "). Waiting to acquire toBeRemovedLock"); 343 synchronized (toBeRemovedLock) { 344 LOG.finest(getQualifiedName() + "Acquired toBeRemovedLock"); 345 while (perTaskState.containsKey(taskId)) { 346 LOG.finest(getQualifiedName() + "Trying to add an existing task. Will wait for removeTask"); 347 try { 348 toBeRemovedLock.wait(); 349 } catch (final InterruptedException e) { 350 throw new RuntimeException(getQualifiedName() + "InterruptedException while waiting on toBeRemovedLock", e); 351 } 352 } 353 LOG.finest(getQualifiedName() + "Released toBeRemovedLock. Waiting to acquire topologiesLock"); 354 } 355 synchronized (topologiesLock) { 356 LOG.finest(getQualifiedName() + "Acquired topologiesLock"); 357 358 boolean isRootOfSomeTopology = false; 359 for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) { 360 final Topology topology = topologies.get(operName); 361 topology.addTask(taskId); 362 isRootOfSomeTopology |= topology.getRootId().equals(taskId); 363 } 364 365 if (isRootOfSomeTopology) { 366 topologiesLock.notifyAll(); 367 } 368 369 perTaskState.put(taskId, TaskState.NOT_STARTED); 370 LOG.finest(getQualifiedName() + "Released topologiesLock"); 371 } 372 LOG.fine(getQualifiedName() + "Added " + taskId + " to topology"); 373 LOG.exiting("CommunicationGroupDriverImpl", "addTask", 374 Arrays.toString(new Object[]{getQualifiedName(), "Added task: ", taskId})); 375 } 376 377 public void removeTask(final String taskId) { 378 LOG.entering("CommunicationGroupDriverImpl", "removeTask", new Object[]{getQualifiedName(), taskId}); 379 LOG.info(getQualifiedName() + "Removing Task " + taskId + 380 " as the evaluator has failed."); 381 LOG.finest(getQualifiedName() + "Remove Task(" + taskId + 382 "): Waiting to acquire topologiesLock"); 383 synchronized (topologiesLock) { 384 LOG.finest(getQualifiedName() + "Acquired topologiesLock"); 385 for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) { 386 final Topology topology = topologies.get(operName); 387 topology.removeTask(taskId); 388 } 389 perTaskState.remove(taskId); 390 LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire toBeRemovedLock"); 391 } 392 synchronized (toBeRemovedLock) { 393 LOG.finest(getQualifiedName() + "Acquired toBeRemovedLock"); 394 LOG.finest(getQualifiedName() + "Removed Task " + taskId + " Notifying waiting threads"); 395 toBeRemovedLock.notifyAll(); 396 LOG.finest(getQualifiedName() + "Released toBeRemovedLock"); 397 } 398 LOG.fine(getQualifiedName() + "Removed " + taskId + " to topology"); 399 LOG.exiting("CommunicationGroupDriverImpl", "removeTask", 400 Arrays.toString(new Object[]{getQualifiedName(), "Removed task: ", taskId})); 401 } 402 403 public void runTask(final String id) { 404 LOG.entering("CommunicationGroupDriverImpl", "runTask", new Object[]{getQualifiedName(), id}); 405 LOG.finest(getQualifiedName() + "Task-" + id + " running. Waiting to acquire topologiesLock"); 406 LOG.fine(getQualifiedName() + "Got running Task: " + id); 407 408 boolean nonMember = false; 409 synchronized (topologiesLock) { 410 if (perTaskState.containsKey(id)) { 411 LOG.finest(getQualifiedName() + "Acquired topologiesLock"); 412 413 for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) { 414 final Topology topology = topologies.get(operName); 415 while (!topology.isRootPresent() && !topology.getRootId().equals(id)) { 416 try { 417 // wait until the root node has been added to the topology 418 topologiesLock.wait(); 419 } catch (final InterruptedException e) { 420 throw new RuntimeException(getQualifiedName() + 421 "InterruptedException while waiting on topologiesLock", e); 422 } 423 } 424 } 425 426 // This loop shouldn't be merged with the one above, because the one above contains a lock.wait(). 427 // All topologies must be modified at one go, without giving up the turn. 428 for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) { 429 final Topology topology = topologies.get(operName); 430 topology.onRunningTask(id); 431 } 432 if (initializing.get()) { 433 allInitialTasksRunning.decrement(); 434 } 435 perTaskState.put(id, TaskState.RUNNING); 436 LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire yetToRunLock"); 437 } else { 438 nonMember = true; 439 } 440 } 441 synchronized (yetToRunLock) { 442 LOG.finest(getQualifiedName() + "Acquired yetToRunLock"); 443 yetToRunLock.notifyAll(); 444 LOG.finest(getQualifiedName() + "Released yetToRunLock"); 445 } 446 if (nonMember) { 447 LOG.exiting("CommunicationGroupDriverImpl", "runTask", 448 getQualifiedName() + id + " does not belong to this communication group. Ignoring"); 449 } else { 450 LOG.fine(getQualifiedName() + "Status of task " + id + " changed to RUNNING"); 451 LOG.exiting("CommunicationGroupDriverImpl", "runTask", 452 Arrays.toString(new Object[]{getQualifiedName(), "Set running complete on task ", id})); 453 } 454 } 455 456 public void failTask(final String id) { 457 LOG.entering("CommunicationGroupDriverImpl", "failTask", new Object[]{getQualifiedName(), id}); 458 LOG.finest(getQualifiedName() + "Task-" + id + " failed. Waiting to acquire yetToRunLock"); 459 LOG.fine(getQualifiedName() + "Got failed Task: " + id); 460 synchronized (yetToRunLock) { 461 LOG.finest(getQualifiedName() + "Acquired yetToRunLock"); 462 // maybe the task does not belong to this communication group. 463 // if it doesn't, we return, it should belong to other group 464 // which will handle its failure 465 if (!perTaskState.containsKey(id)) { 466 LOG.fine(getQualifiedName() 467 + " does not have this task, another communicationGroup must have it"); 468 return; 469 } 470 while (cantFailTask(id)) { 471 LOG.finest(getQualifiedName() + "Need to wait for it run"); 472 try { 473 yetToRunLock.wait(); 474 } catch (final InterruptedException e) { 475 throw new RuntimeException(getQualifiedName() + "InterruptedException while waiting on yetToRunLock", e); 476 } 477 } 478 LOG.finest(getQualifiedName() + id + " - Can safely set failure."); 479 LOG.finest(getQualifiedName() + "Released yetToRunLock. Waiting to acquire topologiesLock"); 480 } 481 synchronized (topologiesLock) { 482 LOG.finest(getQualifiedName() + "Acquired topologiesLock"); 483 for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) { 484 final Topology topology = topologies.get(operName); 485 topology.onFailedTask(id); 486 } 487 if (initializing.get()) { 488 allInitialTasksRunning.increment(); 489 } 490 perTaskState.put(id, TaskState.FAILED); 491 LOG.finest(getQualifiedName() + "Removing msgs associated with dead task " + id + " from msgQue."); 492 final Set<MsgKey> keys = msgQue.keySet(); 493 final List<MsgKey> keysToBeRemoved = new ArrayList<>(); 494 for (final MsgKey msgKey : keys) { 495 if (msgKey.getSrc().equals(id)) { 496 keysToBeRemoved.add(msgKey); 497 } 498 } 499 LOG.finest(getQualifiedName() + keysToBeRemoved + " keys that will be removed"); 500 for (final MsgKey key : keysToBeRemoved) { 501 msgQue.remove(key); 502 } 503 LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire configLock"); 504 } 505 synchronized (configLock) { 506 LOG.finest(getQualifiedName() + "Acquired configLock"); 507 configLock.notifyAll(); 508 LOG.finest(getQualifiedName() + "Released configLock"); 509 } 510 LOG.fine(getQualifiedName() + "Status of task " + id + " changed to FAILED"); 511 LOG.exiting("CommunicationGroupDriverImpl", "failTask", 512 Arrays.toString(new Object[]{getQualifiedName(), "Set failed complete on task ", id})); 513 } 514 515 private boolean cantFailTask(final String taskId) { 516 LOG.entering("CommunicationGroupDriverImpl", "cantFailTask", new Object[]{getQualifiedName(), taskId}); 517 final TaskState taskState = perTaskState.get(taskId); 518 if (!taskState.equals(TaskState.NOT_STARTED)) { 519 LOG.finest(getQualifiedName() + taskId + " has started."); 520 if (!taskState.equals(TaskState.RUNNING)) { 521 LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", 522 Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is not running yet. Can't set failure"})); 523 return true; 524 } else { 525 LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", 526 Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " is running. Can set failure"})); 527 return false; 528 } 529 } else { 530 LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", 531 Arrays.toString(new Object[]{true, getQualifiedName(), taskId, 532 " has not started. We can't fail a task that hasn't started"})); 533 return true; 534 } 535 } 536 537 public void queNProcessMsg(final GroupCommunicationMessage msg) { 538 LOG.entering("CommunicationGroupDriverImpl", "queNProcessMsg", new Object[]{getQualifiedName(), msg}); 539 final IndexedMsg indMsg = new IndexedMsg(msg); 540 final Class<? extends Name<String>> operName = indMsg.getOperName(); 541 final MsgKey key = new MsgKey(msg); 542 if (msgQue.contains(key, indMsg)) { 543 throw new RuntimeException(getQualifiedName() + "MsgQue already contains " + msg.getType() + " msg for " + key + 544 " in " + Utils.simpleName(operName)); 545 } 546 LOG.finest(getQualifiedName() + "Adding msg to que"); 547 msgQue.add(key, indMsg); 548 if (msgQue.count(key) == topologies.size()) { 549 LOG.finest(getQualifiedName() + "MsgQue for " + key + " contains " + msg.getType() + " msgs from: " 550 + msgQue.get(key)); 551 for (final IndexedMsg innerIndMsg : msgQue.remove(key)) { 552 topologies.get(innerIndMsg.getOperName()).onReceiptOfMessage(innerIndMsg.getMsg()); 553 } 554 LOG.finest(getQualifiedName() + "All msgs processed and removed"); 555 } 556 LOG.exiting("CommunicationGroupDriverImpl", "queNProcessMsg", 557 Arrays.toString(new Object[]{getQualifiedName(), "Que & Process done for: ", msg})); 558 } 559 560 private boolean isMsgVersionOk(final GroupCommunicationMessage msg) { 561 LOG.entering("CommunicationGroupDriverImpl", "isMsgVersionOk", new Object[]{getQualifiedName(), msg}); 562 if (msg.hasVersion()) { 563 final String srcId = msg.getSrcid(); 564 final int rcvSrcVersion = msg.getSrcVersion(); 565 final int expSrcVersion = topologies.get(Utils.getClass(msg.getOperatorname())).getNodeVersion(srcId); 566 567 final boolean srcVersionChk = chkVersion(rcvSrcVersion, expSrcVersion, "Src Version Check: "); 568 LOG.exiting("CommunicationGroupDriverImpl", "isMsgVersionOk", 569 Arrays.toString(new Object[]{srcVersionChk, getQualifiedName(), msg})); 570 return srcVersionChk; 571 } else { 572 throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs"); 573 } 574 } 575 576 private boolean chkVersion(final int rcvVersion, final int version, final String msg) { 577 if (rcvVersion < version) { 578 LOG.warning(getQualifiedName() + msg + "received a ver-" + rcvVersion + " msg while expecting ver-" + version); 579 return false; 580 } 581 if (rcvVersion > version) { 582 LOG.warning(getQualifiedName() + msg + "received a HIGHER ver-" + rcvVersion + " msg while expecting ver-" 583 + version + ". Something fishy!!!"); 584 return false; 585 } 586 return true; 587 } 588 589 public void processMsg(final GroupCommunicationMessage msg) { 590 LOG.entering("CommunicationGroupDriverImpl", "processMsg", new Object[]{getQualifiedName(), msg}); 591 LOG.finest(getQualifiedName() + "ProcessMsg: " + msg + ". Waiting to acquire topologiesLock"); 592 synchronized (topologiesLock) { 593 LOG.finest(getQualifiedName() + "Acquired topologiesLock"); 594 if (!isMsgVersionOk(msg)) { 595 LOG.finer(getQualifiedName() + "Discarding msg. Released topologiesLock"); 596 return; 597 } 598 if (initializing.get()) { 599 LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": Waiting for all required(" + 600 allInitialTasksRunning.getInitialCount() + ") nodes to run"); 601 allInitialTasksRunning.await(); 602 LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": All required(" + 603 allInitialTasksRunning.getInitialCount() + ") nodes are running"); 604 initializing.compareAndSet(true, false); 605 } 606 queNProcessMsg(msg); 607 LOG.finest(getQualifiedName() + "Released topologiesLock"); 608 } 609 LOG.exiting("CommunicationGroupDriverImpl", "processMsg", 610 Arrays.toString(new Object[]{getQualifiedName(), "ProcessMsg done for: ", msg})); 611 } 612 613 private String taskId(final Configuration partialTaskConf) { 614 try { 615 final Injector injector = Tang.Factory.getTang().newInjector(partialTaskConf); 616 return injector.getNamedInstance(TaskConfigurationOptions.Identifier.class); 617 } catch (final InjectionException e) { 618 throw new RuntimeException(getQualifiedName() + 619 "Injection exception while extracting taskId from partialTaskConf", e); 620 } 621 } 622 623 private String getQualifiedName() { 624 return Utils.simpleName(groupName) + " - "; 625 } 626}