001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.driver;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.evaluator.FailedEvaluator;
024import org.apache.reef.driver.parameters.DriverIdentifier;
025import org.apache.reef.driver.task.FailedTask;
026import org.apache.reef.driver.task.RunningTask;
027import org.apache.reef.driver.task.TaskConfigurationOptions;
028import org.apache.reef.io.network.group.api.config.OperatorSpec;
029import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
030import org.apache.reef.io.network.group.api.driver.Topology;
031import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
032import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
033import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
034import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
035import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
036import org.apache.reef.io.network.group.impl.config.parameters.*;
037import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
038import org.apache.reef.io.network.group.impl.utils.CountingSemaphore;
039import org.apache.reef.io.network.group.impl.utils.SetMap;
040import org.apache.reef.io.network.group.impl.utils.Utils;
041import org.apache.reef.tang.Configuration;
042import org.apache.reef.tang.Injector;
043import org.apache.reef.tang.JavaConfigurationBuilder;
044import org.apache.reef.tang.Tang;
045import org.apache.reef.tang.annotations.Name;
046import org.apache.reef.tang.annotations.Parameter;
047import org.apache.reef.tang.exceptions.InjectionException;
048import org.apache.reef.tang.formats.ConfigurationSerializer;
049import org.apache.reef.wake.EStage;
050import org.apache.reef.wake.impl.SingleThreadStage;
051
052import javax.inject.Inject;
053import java.util.*;
054import java.util.concurrent.ConcurrentHashMap;
055import java.util.concurrent.ConcurrentMap;
056import java.util.concurrent.atomic.AtomicBoolean;
057import java.util.logging.Level;
058import java.util.logging.Logger;
059
060@DriverSide
061@Private
062public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
063
064  private static final Logger LOG = Logger.getLogger(CommunicationGroupDriverImpl.class.getName());
065
066  private final Class<? extends Name<String>> groupName;
067  private final ConcurrentMap<Class<? extends Name<String>>, OperatorSpec> operatorSpecs = new ConcurrentHashMap<>();
068  private final ConcurrentMap<Class<? extends Name<String>>, Topology> topologies = new ConcurrentHashMap<>();
069  private final Map<String, TaskState> perTaskState = new HashMap<>();
070  private boolean finalised = false;
071  private final ConfigurationSerializer confSerializer;
072  private final String driverId;
073
074  private final CountingSemaphore allInitialTasksRunning;
075
076  private final Object topologiesLock = new Object();
077  private final Object configLock = new Object();
078  private final AtomicBoolean initializing = new AtomicBoolean(true);
079
080  private final Object yetToRunLock = new Object();
081  private final Object toBeRemovedLock = new Object();
082
083  private final SetMap<MsgKey, IndexedMsg> msgQue = new SetMap<>();
084
085  private final TopologyFactory topologyFactory;
086  private final Class<? extends Topology> topologyClass;
087
088  /**
089   * @deprecated in 0.14. Use Tang to obtain an instance of this instead.
090   */
091  @Deprecated
092  public CommunicationGroupDriverImpl(final Class<? extends Name<String>> groupName,
093                                      final ConfigurationSerializer confSerializer,
094                                      final EStage<GroupCommunicationMessage> senderStage,
095                                      final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler,
096                                      final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler,
097                                      final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler,
098                                      final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler,
099                                      final String driverId, final int numberOfTasks, final int fanOut) {
100    super();
101    this.groupName = groupName;
102    this.driverId = driverId;
103    this.confSerializer = confSerializer;
104    this.allInitialTasksRunning = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
105
106    groupCommRunningTaskHandler.addHandler(new TopologyRunningTaskHandler(this));
107    groupCommFailedTaskHandler.addHandler(new TopologyFailedTaskHandler(this));
108    groupCommFailedEvaluatorHandler.addHandler(new TopologyFailedEvaluatorHandler(this));
109    commGroupMessageHandler.addHandler(new TopologyMessageHandler(this));
110    final Injector injector = Tang.Factory.getTang().newInjector();
111    injector.bindVolatileParameter(CommGroupNameClass.class, groupName);
112    injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage);
113    injector.bindVolatileParameter(DriverIdentifier.class, driverId);
114    injector.bindVolatileParameter(CommGroupNumTask.class, numberOfTasks);
115    injector.bindVolatileParameter(TreeTopologyFanOut.class, fanOut);
116    try {
117      topologyFactory = injector.getInstance(TopologyFactory.class);
118    } catch (final InjectionException e) {
119      throw new RuntimeException(e);
120    }
121    this.topologyClass = TreeTopology.class;
122  }
123
124  @Inject
125  private CommunicationGroupDriverImpl(
126      @Parameter(CommGroupNameClass.class) final Class<? extends Name<String>> groupName,
127      final ConfigurationSerializer confSerializer,
128      @Parameter(GroupCommRunningTaskHandler.class)
129          final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler,
130      @Parameter(GroupCommFailedTaskHandler.class)
131          final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler,
132      @Parameter(GroupCommFailedEvalHandler.class)
133          final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler,
134          final GroupCommMessageHandler groupCommMessageHandler,
135      @Parameter(DriverIdentifier.class) final String driverId,
136      @Parameter(CommGroupNumTask.class) final int numberOfTasks,
137      final TopologyFactory topologyFactory,
138      @Parameter(TopologyClass.class) final Class<? extends Topology> topologyClass) {
139    super();
140    this.groupName = groupName;
141    this.driverId = driverId;
142    this.confSerializer = confSerializer;
143    this.allInitialTasksRunning = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
144
145    registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler,
146        groupCommFailedEvaluatorHandler, groupCommMessageHandler);
147    this.topologyFactory = topologyFactory;
148    this.topologyClass = topologyClass;
149  }
150
151  private void registerHandlers(
152      final BroadcastingEventHandler<RunningTask> runningTaskHandler,
153      final BroadcastingEventHandler<FailedTask> failedTaskHandler,
154      final BroadcastingEventHandler<FailedEvaluator> failedEvaluatorHandler,
155      final GroupCommMessageHandler groupCommMessageHandler) {
156    runningTaskHandler.addHandler(new TopologyRunningTaskHandler(this));
157    failedTaskHandler.addHandler(new TopologyFailedTaskHandler(this));
158    failedEvaluatorHandler.addHandler(new TopologyFailedEvaluatorHandler(this));
159    groupCommMessageHandler.addHandler(groupName, new SingleThreadStage<>(new TopologyMessageHandler(this), 100 * 100));
160  }
161
162  @Override
163  public CommunicationGroupDriver addBroadcast(final Class<? extends Name<String>> operatorName,
164                                               final BroadcastOperatorSpec spec) {
165    LOG.entering("CommunicationGroupDriverImpl", "addBroadcast",
166        new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec});
167    if (finalised) {
168      throw new IllegalStateException("Can't add more operators to a finalised spec");
169    }
170    operatorSpecs.put(operatorName, spec);
171
172    final Topology topology;
173    try {
174      topology = topologyFactory.getNewInstance(operatorName, topologyClass);
175    } catch (final InjectionException e) {
176      LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
177      throw new RuntimeException(e);
178    }
179
180    topology.setRootTask(spec.getSenderId());
181    topology.setOperatorSpecification(spec);
182    topologies.put(operatorName, topology);
183    LOG.exiting("CommunicationGroupDriverImpl", "addBroadcast",
184        Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), " added"}));
185    return this;
186  }
187
188  @Override
189  public CommunicationGroupDriver addReduce(final Class<? extends Name<String>> operatorName,
190                                            final ReduceOperatorSpec spec) {
191    LOG.entering("CommunicationGroupDriverImpl", "addReduce",
192        new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec});
193    if (finalised) {
194      throw new IllegalStateException("Can't add more operators to a finalised spec");
195    }
196    LOG.finer(getQualifiedName() + "Adding reduce operator to tree topology: " + spec);
197    operatorSpecs.put(operatorName, spec);
198
199    final Topology topology;
200    try {
201      topology = topologyFactory.getNewInstance(operatorName, topologyClass);
202    } catch (final InjectionException e) {
203      LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
204      throw new RuntimeException(e);
205    }
206
207    topology.setRootTask(spec.getReceiverId());
208    topology.setOperatorSpecification(spec);
209    topologies.put(operatorName, topology);
210    LOG.exiting("CommunicationGroupDriverImpl", "addReduce",
211        Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), " added"}));
212    return this;
213  }
214
215  @Override
216  public CommunicationGroupDriver addScatter(final Class<? extends Name<String>> operatorName,
217                                             final ScatterOperatorSpec spec) {
218    LOG.entering("CommunicationGroupDriverImpl", "addScatter",
219        new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec});
220    if (finalised) {
221      throw new IllegalStateException("Can't add more operators to a finalised spec");
222    }
223    operatorSpecs.put(operatorName, spec);
224
225    final Topology topology;
226    try {
227      topology = topologyFactory.getNewInstance(operatorName, topologyClass);
228    } catch (final InjectionException e) {
229      LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
230      throw new RuntimeException(e);
231    }
232
233    topology.setRootTask(spec.getSenderId());
234    topology.setOperatorSpecification(spec);
235    topologies.put(operatorName, topology);
236    LOG.exiting("CommunicationGroupDriverImpl", "addScatter",
237        Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}));
238    return this;
239  }
240
241  @Override
242  public CommunicationGroupDriver addGather(final Class<? extends Name<String>> operatorName,
243                                            final GatherOperatorSpec spec) {
244    LOG.entering("CommunicationGroupDriverImpl", "addGather",
245        new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec});
246    if (finalised) {
247      throw new IllegalStateException("Can't add more operators to a finalised spec");
248    }
249    operatorSpecs.put(operatorName, spec);
250
251    final Topology topology;
252    try {
253      topology = topologyFactory.getNewInstance(operatorName, topologyClass);
254    } catch (final InjectionException e) {
255      LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
256      throw new RuntimeException(e);
257    }
258
259    topology.setRootTask(spec.getReceiverId());
260    topology.setOperatorSpecification(spec);
261    topologies.put(operatorName, topology);
262    LOG.exiting("CommunicationGroupDriverImpl", "addGather",
263        Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}));
264    return this;
265  }
266
267  @Override
268  public Configuration getTaskConfiguration(final Configuration taskConf) {
269    LOG.entering("CommunicationGroupDriverImpl", "getTaskConfiguration",
270        new Object[]{getQualifiedName(), confSerializer.toString(taskConf)});
271    final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
272    final String taskId = taskId(taskConf);
273    if (perTaskState.containsKey(taskId)) {
274      jcb.bindNamedParameter(CommunicationGroupName.class, groupName.getName());
275      jcb.bindNamedParameter(DriverIdentifierGroupComm.class, driverId);
276      LOG.finest(getQualifiedName() + "Task has been added. Waiting to acquire configLock");
277      synchronized (configLock) {
278        LOG.finest(getQualifiedName() + "Acquired configLock");
279        while (cantGetConfig(taskId)) {
280          LOG.finest(getQualifiedName() + "Need to wait for failure");
281          try {
282            configLock.wait();
283          } catch (final InterruptedException e) {
284            throw new RuntimeException(getQualifiedName() + "InterruptedException while waiting on configLock", e);
285          }
286        }
287        LOG.finest(getQualifiedName() + taskId + " - Will fetch configuration now.");
288        LOG.finest(getQualifiedName() + "Released configLock. Waiting to acquire topologiesLock");
289      }
290      synchronized (topologiesLock) {
291        LOG.finest(getQualifiedName() + "Acquired topologiesLock");
292        for (final Map.Entry<Class<? extends Name<String>>, OperatorSpec> operSpecEntry : operatorSpecs.entrySet()) {
293          final Class<? extends Name<String>> operName = operSpecEntry.getKey();
294          final Topology topology = topologies.get(operName);
295          final JavaConfigurationBuilder jcbInner = Tang.Factory.getTang()
296              .newConfigurationBuilder(topology.getTaskConfiguration(taskId));
297          jcbInner.bindNamedParameter(OperatorName.class, operName.getName());
298          jcb.bindSetEntry(SerializedOperConfigs.class, confSerializer.toString(jcbInner.build()));
299        }
300        LOG.finest(getQualifiedName() + "Released topologiesLock");
301      }
302    } else {
303      return null;
304    }
305    final Configuration configuration = jcb.build();
306    LOG.exiting("CommunicationGroupDriverImpl", "getTaskConfiguration",
307        Arrays.toString(new Object[]{getQualifiedName(), confSerializer.toString(configuration)}));
308    return configuration;
309  }
310
311  private boolean cantGetConfig(final String taskId) {
312    LOG.entering("CommunicationGroupDriverImpl", "cantGetConfig", new Object[]{getQualifiedName(), taskId});
313    final TaskState taskState = perTaskState.get(taskId);
314    if (!taskState.equals(TaskState.NOT_STARTED)) {
315      LOG.finest(getQualifiedName() + taskId + " has started.");
316      if (taskState.equals(TaskState.RUNNING)) {
317        LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig",
318            Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is running. We can't get config"}));
319        return true;
320      } else {
321        LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig",
322            Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has failed. We can get config"}));
323        return false;
324      }
325    } else {
326      LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig",
327          Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has not started. We can get config"}));
328      return false;
329    }
330  }
331
332  @Override
333  public void finalise() {
334    finalised = true;
335  }
336
337  @Override
338  public void addTask(final Configuration partialTaskConf) {
339    LOG.entering("CommunicationGroupDriverImpl", "addTask",
340        new Object[]{getQualifiedName(), confSerializer.toString(partialTaskConf)});
341    final String taskId = taskId(partialTaskConf);
342    LOG.finest(getQualifiedName() + "AddTask(" + taskId + "). Waiting to acquire toBeRemovedLock");
343    synchronized (toBeRemovedLock) {
344      LOG.finest(getQualifiedName() + "Acquired toBeRemovedLock");
345      while (perTaskState.containsKey(taskId)) {
346        LOG.finest(getQualifiedName() + "Trying to add an existing task. Will wait for removeTask");
347        try {
348          toBeRemovedLock.wait();
349        } catch (final InterruptedException e) {
350          throw new RuntimeException(getQualifiedName() + "InterruptedException while waiting on toBeRemovedLock", e);
351        }
352      }
353      LOG.finest(getQualifiedName() + "Released toBeRemovedLock. Waiting to acquire topologiesLock");
354    }
355    synchronized (topologiesLock) {
356      LOG.finest(getQualifiedName() + "Acquired topologiesLock");
357
358      boolean isRootOfSomeTopology = false;
359      for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) {
360        final Topology topology = topologies.get(operName);
361        topology.addTask(taskId);
362        isRootOfSomeTopology |= topology.getRootId().equals(taskId);
363      }
364
365      if (isRootOfSomeTopology) {
366        topologiesLock.notifyAll();
367      }
368
369      perTaskState.put(taskId, TaskState.NOT_STARTED);
370      LOG.finest(getQualifiedName() + "Released topologiesLock");
371    }
372    LOG.fine(getQualifiedName() + "Added " + taskId + " to topology");
373    LOG.exiting("CommunicationGroupDriverImpl", "addTask",
374        Arrays.toString(new Object[]{getQualifiedName(), "Added task: ", taskId}));
375  }
376
377  public void removeTask(final String taskId) {
378    LOG.entering("CommunicationGroupDriverImpl", "removeTask", new Object[]{getQualifiedName(), taskId});
379    LOG.info(getQualifiedName() + "Removing Task " + taskId +
380        " as the evaluator has failed.");
381    LOG.finest(getQualifiedName() + "Remove Task(" + taskId +
382        "): Waiting to acquire topologiesLock");
383    synchronized (topologiesLock) {
384      LOG.finest(getQualifiedName() + "Acquired topologiesLock");
385      for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) {
386        final Topology topology = topologies.get(operName);
387        topology.removeTask(taskId);
388      }
389      perTaskState.remove(taskId);
390      LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire toBeRemovedLock");
391    }
392    synchronized (toBeRemovedLock) {
393      LOG.finest(getQualifiedName() + "Acquired toBeRemovedLock");
394      LOG.finest(getQualifiedName() + "Removed Task " + taskId + " Notifying waiting threads");
395      toBeRemovedLock.notifyAll();
396      LOG.finest(getQualifiedName() + "Released toBeRemovedLock");
397    }
398    LOG.fine(getQualifiedName() + "Removed " + taskId + " to topology");
399    LOG.exiting("CommunicationGroupDriverImpl", "removeTask",
400        Arrays.toString(new Object[]{getQualifiedName(), "Removed task: ", taskId}));
401  }
402
403  public void runTask(final String id) {
404    LOG.entering("CommunicationGroupDriverImpl", "runTask", new Object[]{getQualifiedName(), id});
405    LOG.finest(getQualifiedName() + "Task-" + id + " running. Waiting to acquire topologiesLock");
406    LOG.fine(getQualifiedName() + "Got running Task: " + id);
407
408    boolean nonMember = false;
409    synchronized (topologiesLock) {
410      if (perTaskState.containsKey(id)) {
411        LOG.finest(getQualifiedName() + "Acquired topologiesLock");
412
413        for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) {
414          final Topology topology = topologies.get(operName);
415          while (!topology.isRootPresent() && !topology.getRootId().equals(id)) {
416            try {
417              // wait until the root node has been added to the topology
418              topologiesLock.wait();
419            } catch (final InterruptedException e) {
420              throw new RuntimeException(getQualifiedName() +
421                  "InterruptedException while waiting on topologiesLock", e);
422            }
423          }
424        }
425
426        // This loop shouldn't be merged with the one above, because the one above contains a lock.wait().
427        // All topologies must be modified at one go, without giving up the turn.
428        for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) {
429          final Topology topology = topologies.get(operName);
430          topology.onRunningTask(id);
431        }
432        if (initializing.get()) {
433          allInitialTasksRunning.decrement();
434        }
435        perTaskState.put(id, TaskState.RUNNING);
436        LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire yetToRunLock");
437      } else {
438        nonMember = true;
439      }
440    }
441    synchronized (yetToRunLock) {
442      LOG.finest(getQualifiedName() + "Acquired yetToRunLock");
443      yetToRunLock.notifyAll();
444      LOG.finest(getQualifiedName() + "Released yetToRunLock");
445    }
446    if (nonMember) {
447      LOG.exiting("CommunicationGroupDriverImpl", "runTask",
448          getQualifiedName() + id + " does not belong to this communication group. Ignoring");
449    } else {
450      LOG.fine(getQualifiedName() + "Status of task " + id + " changed to RUNNING");
451      LOG.exiting("CommunicationGroupDriverImpl", "runTask",
452          Arrays.toString(new Object[]{getQualifiedName(), "Set running complete on task ", id}));
453    }
454  }
455
456  public void failTask(final String id) {
457    LOG.entering("CommunicationGroupDriverImpl", "failTask", new Object[]{getQualifiedName(), id});
458    LOG.finest(getQualifiedName() + "Task-" + id + " failed. Waiting to acquire yetToRunLock");
459    LOG.fine(getQualifiedName() + "Got failed Task: " + id);
460    synchronized (yetToRunLock) {
461      LOG.finest(getQualifiedName() + "Acquired yetToRunLock");
462      // maybe the task does not belong to this communication group.
463      // if it doesn't, we return, it should belong to other group
464      // which will handle its failure
465      if (!perTaskState.containsKey(id)) {
466        LOG.fine(getQualifiedName()
467            + " does not have this task, another communicationGroup must have it");
468        return;
469      }
470      while (cantFailTask(id)) {
471        LOG.finest(getQualifiedName() + "Need to wait for it run");
472        try {
473          yetToRunLock.wait();
474        } catch (final InterruptedException e) {
475          throw new RuntimeException(getQualifiedName() + "InterruptedException while waiting on yetToRunLock", e);
476        }
477      }
478      LOG.finest(getQualifiedName() + id + " - Can safely set failure.");
479      LOG.finest(getQualifiedName() + "Released yetToRunLock. Waiting to acquire topologiesLock");
480    }
481    synchronized (topologiesLock) {
482      LOG.finest(getQualifiedName() + "Acquired topologiesLock");
483      for (final Class<? extends Name<String>> operName : operatorSpecs.keySet()) {
484        final Topology topology = topologies.get(operName);
485        topology.onFailedTask(id);
486      }
487      if (initializing.get()) {
488        allInitialTasksRunning.increment();
489      }
490      perTaskState.put(id, TaskState.FAILED);
491      LOG.finest(getQualifiedName() + "Removing msgs associated with dead task " + id + " from msgQue.");
492      final Set<MsgKey> keys = msgQue.keySet();
493      final List<MsgKey> keysToBeRemoved = new ArrayList<>();
494      for (final MsgKey msgKey : keys) {
495        if (msgKey.getSrc().equals(id)) {
496          keysToBeRemoved.add(msgKey);
497        }
498      }
499      LOG.finest(getQualifiedName() + keysToBeRemoved + " keys that will be removed");
500      for (final MsgKey key : keysToBeRemoved) {
501        msgQue.remove(key);
502      }
503      LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire configLock");
504    }
505    synchronized (configLock) {
506      LOG.finest(getQualifiedName() + "Acquired configLock");
507      configLock.notifyAll();
508      LOG.finest(getQualifiedName() + "Released configLock");
509    }
510    LOG.fine(getQualifiedName() + "Status of task " + id + " changed to FAILED");
511    LOG.exiting("CommunicationGroupDriverImpl", "failTask",
512        Arrays.toString(new Object[]{getQualifiedName(), "Set failed complete on task ", id}));
513  }
514
515  private boolean cantFailTask(final String taskId) {
516    LOG.entering("CommunicationGroupDriverImpl", "cantFailTask", new Object[]{getQualifiedName(), taskId});
517    final TaskState taskState = perTaskState.get(taskId);
518    if (!taskState.equals(TaskState.NOT_STARTED)) {
519      LOG.finest(getQualifiedName() + taskId + " has started.");
520      if (!taskState.equals(TaskState.RUNNING)) {
521        LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask",
522            Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is not running yet. Can't set failure"}));
523        return true;
524      } else {
525        LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask",
526            Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " is running. Can set failure"}));
527        return false;
528      }
529    } else {
530      LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask",
531          Arrays.toString(new Object[]{true, getQualifiedName(), taskId,
532              " has not started. We can't fail a task that hasn't started"}));
533      return true;
534    }
535  }
536
537  public void queNProcessMsg(final GroupCommunicationMessage msg) {
538    LOG.entering("CommunicationGroupDriverImpl", "queNProcessMsg", new Object[]{getQualifiedName(), msg});
539    final IndexedMsg indMsg = new IndexedMsg(msg);
540    final Class<? extends Name<String>> operName = indMsg.getOperName();
541    final MsgKey key = new MsgKey(msg);
542    if (msgQue.contains(key, indMsg)) {
543      throw new RuntimeException(getQualifiedName() + "MsgQue already contains " + msg.getType() + " msg for " + key +
544          " in " + Utils.simpleName(operName));
545    }
546    LOG.finest(getQualifiedName() + "Adding msg to que");
547    msgQue.add(key, indMsg);
548    if (msgQue.count(key) == topologies.size()) {
549      LOG.finest(getQualifiedName() + "MsgQue for " + key + " contains " + msg.getType() + " msgs from: "
550          + msgQue.get(key));
551      for (final IndexedMsg innerIndMsg : msgQue.remove(key)) {
552        topologies.get(innerIndMsg.getOperName()).onReceiptOfMessage(innerIndMsg.getMsg());
553      }
554      LOG.finest(getQualifiedName() + "All msgs processed and removed");
555    }
556    LOG.exiting("CommunicationGroupDriverImpl", "queNProcessMsg",
557        Arrays.toString(new Object[]{getQualifiedName(), "Que & Process done for: ", msg}));
558  }
559
560  private boolean isMsgVersionOk(final GroupCommunicationMessage msg) {
561    LOG.entering("CommunicationGroupDriverImpl", "isMsgVersionOk", new Object[]{getQualifiedName(), msg});
562    if (msg.hasVersion()) {
563      final String srcId = msg.getSrcid();
564      final int rcvSrcVersion = msg.getSrcVersion();
565      final int expSrcVersion = topologies.get(Utils.getClass(msg.getOperatorname())).getNodeVersion(srcId);
566
567      final boolean srcVersionChk = chkVersion(rcvSrcVersion, expSrcVersion, "Src Version Check: ");
568      LOG.exiting("CommunicationGroupDriverImpl", "isMsgVersionOk",
569          Arrays.toString(new Object[]{srcVersionChk, getQualifiedName(), msg}));
570      return srcVersionChk;
571    } else {
572      throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs");
573    }
574  }
575
576  private boolean chkVersion(final int rcvVersion, final int version, final String msg) {
577    if (rcvVersion < version) {
578      LOG.warning(getQualifiedName() + msg + "received a ver-" + rcvVersion + " msg while expecting ver-" + version);
579      return false;
580    }
581    if (rcvVersion > version) {
582      LOG.warning(getQualifiedName() + msg + "received a HIGHER ver-" + rcvVersion + " msg while expecting ver-"
583          + version + ". Something fishy!!!");
584      return false;
585    }
586    return true;
587  }
588
589  public void processMsg(final GroupCommunicationMessage msg) {
590    LOG.entering("CommunicationGroupDriverImpl", "processMsg", new Object[]{getQualifiedName(), msg});
591    LOG.finest(getQualifiedName() + "ProcessMsg: " + msg + ". Waiting to acquire topologiesLock");
592    synchronized (topologiesLock) {
593      LOG.finest(getQualifiedName() + "Acquired topologiesLock");
594      if (!isMsgVersionOk(msg)) {
595        LOG.finer(getQualifiedName() + "Discarding msg. Released topologiesLock");
596        return;
597      }
598      if (initializing.get()) {
599        LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": Waiting for all required(" +
600            allInitialTasksRunning.getInitialCount() + ") nodes to run");
601        allInitialTasksRunning.await();
602        LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": All required(" +
603            allInitialTasksRunning.getInitialCount() + ") nodes are running");
604        initializing.compareAndSet(true, false);
605      }
606      queNProcessMsg(msg);
607      LOG.finest(getQualifiedName() + "Released topologiesLock");
608    }
609    LOG.exiting("CommunicationGroupDriverImpl", "processMsg",
610        Arrays.toString(new Object[]{getQualifiedName(), "ProcessMsg done for: ", msg}));
611  }
612
613  private String taskId(final Configuration partialTaskConf) {
614    try {
615      final Injector injector = Tang.Factory.getTang().newInjector(partialTaskConf);
616      return injector.getNamedInstance(TaskConfigurationOptions.Identifier.class);
617    } catch (final InjectionException e) {
618      throw new RuntimeException(getQualifiedName() +
619          "Injection exception while extracting taskId from partialTaskConf", e);
620    }
621  }
622
623  private String getQualifiedName() {
624    return Utils.simpleName(groupName) + " - ";
625  }
626}