This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.task;
020
021import org.apache.reef.exception.evaluator.NetworkException;
022import org.apache.reef.io.network.exception.ParentDeadException;
023import org.apache.reef.io.network.group.api.operators.Reduce;
024import org.apache.reef.io.network.group.api.task.OperatorTopology;
025import org.apache.reef.io.network.group.api.task.OperatorTopologyStruct;
026import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
027import org.apache.reef.io.network.group.impl.operators.Sender;
028import org.apache.reef.io.network.group.impl.utils.ResettingCountDownLatch;
029import org.apache.reef.io.network.group.impl.utils.Utils;
030import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
031import org.apache.reef.io.serialization.Codec;
032import org.apache.reef.tang.annotations.Name;
033import org.apache.reef.wake.EStage;
034import org.apache.reef.wake.EventHandler;
035import org.apache.reef.wake.impl.SingleThreadStage;
036
037import java.util.Arrays;
038import java.util.HashSet;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.BlockingQueue;
042import java.util.concurrent.LinkedBlockingQueue;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.logging.Logger;
045
046public class OperatorTopologyImpl implements OperatorTopology {
047
048  private static final Logger LOG = Logger.getLogger(OperatorTopologyImpl.class.getName());
049
050  private final Class<? extends Name<String>> groupName;
051  private final Class<? extends Name<String>> operName;
052  private final String selfId;
053  private final String driverId;
054  private final Sender sender;
055  private final Object topologyLock = new Object();
056
057  private final int version;
058
059  private final BlockingQueue<GroupCommunicationMessage> deltas = new LinkedBlockingQueue<>();
060  private final BlockingQueue<GroupCommunicationMessage> deletionDeltas = new LinkedBlockingQueue<>();
061
062  private OperatorTopologyStruct baseTopology;
063  private OperatorTopologyStruct effectiveTopology;
064  private final ResettingCountDownLatch topologyLockAcquired = new ResettingCountDownLatch(1);
065  private final AtomicBoolean updatingTopo = new AtomicBoolean(false);
066
067  private final EventHandler<GroupCommunicationMessage> baseTopologyUpdateHandler = new BaseTopologyUpdateHandler();
068
069  private final EStage<GroupCommunicationMessage> baseTopologyUpdateStage = new SingleThreadStage<>(
070      "BaseTopologyUpdateStage",
071      baseTopologyUpdateHandler,
072      5);
073
074  private final EventHandler<GroupCommunicationMessage> dataHandlingStageHandler = new DataHandlingStageHandler();
075
076  // The queue capacity might determine how many tasks can be handled
077  private final EStage<GroupCommunicationMessage> dataHandlingStage = new SingleThreadStage<>("DataHandlingStage",
078      dataHandlingStageHandler,
079      10000);
080
081  public OperatorTopologyImpl(final Class<? extends Name<String>> groupName,
082                              final Class<? extends Name<String>> operName, final String selfId,
083                              final String driverId, final Sender sender, final int version) {
084    super();
085    this.groupName = groupName;
086    this.operName = operName;
087    this.selfId = selfId;
088    this.driverId = driverId;
089    this.sender = sender;
090    this.version = version;
091  }
092
093  /**
094   * Handle messages meant for this operator. Data msgs are passed on
095   * to the DataHandlingStage while Ctrl msgs are queued up for the
096   * base topology to update later. Ctrl msgs signalling death of a
097   * task are also routed to the effectiveTopology in order to notify
098   * a waiting operation. During initialization when effective topology
099   * is not yet set-up, these *Dead msgs are queued in deletionDeltas
100   * for the small time window when these arrive after baseTopology has
101   * received TopologySetup but not yet created the effectiveTopology.
102   * Most times the msgs in the deletionDeltas will be discarded as stale
103   * msgs
104   * <p>
105   * No synchronization is needed while handling *Dead messages.
106   * There 2 states: UpdatingTopo and NotUpdatingTopo
107   * If UpdatingTopo, deltas.put still takes care of adding this msg to effTop through baseTopo changes.
108   * If not, we add to effTopo. So we are good.
109   * <p>
110   * However, for data msgs synchronization is needed. Look at doc of
111   * DataHandlingStage
112   * <p>
113   * Adding to deletionDeltas should be outside
114   * effTopo!=null block. There is a rare possibility that during initialization
115   * just after baseTopo is created(so deltas will be ignored) and just before
116   * effTopo is created(so effTopo will be null) where we can miss a deletion
117   * msg if not added to deletionDelta because this method is synchronized
118   */
119  @Override
120  public void handle(final GroupCommunicationMessage msg) {
121    LOG.entering("OperatorTopologyImpl", "handle", new Object[]{getQualifiedName(), msg});
122    if (isMsgVersionOk(msg)) {
123      try {
124        switch (msg.getType()) {
125        case UpdateTopology:
126          updatingTopo.set(true);
127          baseTopologyUpdateStage.onNext(msg);
128          topologyLockAcquired.awaitAndReset(1);
129          LOG.finest(getQualifiedName() + "topoLockAcquired CDL released. Resetting it to new CDL");
130          sendAckToDriver(msg);
131          break;
132
133        case TopologySetup:
134          LOG.finest(getQualifiedName() + "Adding to deltas queue");
135          deltas.put(msg);
136          break;
137
138        case ParentAdd:
139        case ChildAdd:
140          LOG.finest(getQualifiedName() + "Adding to deltas queue");
141          deltas.put(msg);
142          break;
143
144        case ParentDead:
145        case ChildDead:
146          LOG.finest(getQualifiedName() + "Adding to deltas queue");
147          deltas.put(msg);
148
149          LOG.finest(getQualifiedName() + "Adding to deletionDeltas queue");
150          deletionDeltas.put(msg);
151
152          if (effectiveTopology != null) {
153            LOG.finest(getQualifiedName() + "Adding as data msg to non-null effective topology struct");
154            effectiveTopology.addAsData(msg);
155          } else {
156            LOG.fine(getQualifiedName() + "Received a death message before effective topology was setup. CAUTION");
157          }
158          break;
159
160        default:
161          dataHandlingStage.onNext(msg);
162        }
163      } catch (final InterruptedException e) {
164        throw new RuntimeException("InterruptedException while trying to put ctrl msg into delta queue", e);
165      }
166    }
167    LOG.exiting("OperatorTopologyImpl", "handle", Arrays.toString(new Object[]{getQualifiedName(), msg}));
168  }
169
170  private boolean isMsgVersionOk(final GroupCommunicationMessage msg) {
171    LOG.entering("OperatorTopologyImpl", "isMsgVersionOk", new Object[]{getQualifiedName(), msg});
172    if (msg.hasVersion()) {
173      final int msgVersion = msg.getVersion();
174      final boolean retVal;
175      if (msgVersion < version) {
176        LOG.warning(getQualifiedName() + "Received a ver-" + msgVersion + " msg while expecting ver-" + version
177            + ". Discarding msg");
178        retVal = false;
179      } else {
180        retVal = true;
181      }
182      LOG.exiting("OperatorTopologyImpl", "isMsgVersionOk",
183          Arrays.toString(new Object[]{retVal, getQualifiedName(), msg}));
184      return retVal;
185    } else {
186      throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs");
187    }
188  }
189
190  @Override
191  public void initialize() throws ParentDeadException {
192    LOG.entering("OperatorTopologyImpl", "initialize", getQualifiedName());
193    createBaseTopology();
194    LOG.exiting("OperatorTopologyImpl", "initialize", getQualifiedName());
195  }
196
197  @Override
198  public void sendToParent(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
199      throws ParentDeadException {
200    LOG.entering("OperatorTopologyImpl", "sendToParent", new Object[] {getQualifiedName(), msgType});
201    refreshEffectiveTopology();
202    assert effectiveTopology != null;
203    effectiveTopology.sendToParent(data, msgType);
204    LOG.exiting("OperatorTopologyImpl", "sendToParent", getQualifiedName());
205  }
206
207  @Override
208  public void sendToChildren(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
209      throws ParentDeadException {
210    LOG.entering("OperatorTopologyImpl", "sendToChildren", new Object[]{getQualifiedName(), msgType});
211    refreshEffectiveTopology();
212    assert effectiveTopology != null;
213    effectiveTopology.sendToChildren(data, msgType);
214    LOG.exiting("OperatorTopologyImpl", "sendToChildren", getQualifiedName());
215  }
216
217  @Override
218  public void sendToChildren(final Map<String, byte[]> dataMap,
219                             final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
220      throws ParentDeadException {
221    LOG.entering("OperatorTopologyImpl", "sendToChildren", new Object[]{getQualifiedName(), msgType});
222    refreshEffectiveTopology();
223    assert effectiveTopology != null;
224    effectiveTopology.sendToChildren(dataMap, msgType);
225    LOG.exiting("OperatorTopologyImpl", "sendToChildren", getQualifiedName());
226  }
227
228  @Override
229  public byte[] recvFromParent(final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
230      throws ParentDeadException {
231    LOG.entering("OperatorTopologyImpl", "recvFromParent", new Object[] {getQualifiedName(), msgType});
232    refreshEffectiveTopology();
233    assert effectiveTopology != null;
234    final byte[] retVal = effectiveTopology.recvFromParent(msgType);
235    LOG.exiting("OperatorTopologyImpl", "recvFromParent", getQualifiedName());
236    return retVal;
237  }
238
239  @Override
240  public <T> T recvFromChildren(final Reduce.ReduceFunction<T> redFunc, final Codec<T> dataCodec)
241      throws ParentDeadException {
242    LOG.entering("OperatorTopologyImpl", "recvFromChildren", getQualifiedName());
243    refreshEffectiveTopology();
244    assert effectiveTopology != null;
245    final T retVal = effectiveTopology.recvFromChildren(redFunc, dataCodec);
246    LOG.exiting("OperatorTopologyImpl", "recvFromChildren", getQualifiedName());
247    return retVal;
248  }
249
250  @Override
251  public byte[] recvFromChildren() throws ParentDeadException {
252    LOG.entering("OperatorTopologyImpl", "recvFromChildren", getQualifiedName());
253    refreshEffectiveTopology();
254    assert effectiveTopology != null;
255    final byte[] retVal = effectiveTopology.recvFromChildren();
256    LOG.exiting("OperatorTopologyImpl", "recvFromChildren", getQualifiedName());
257    return retVal;
258  }
259
260  /**
261   * Only refreshes the effective topology with deletion msgs from.
262   * deletionDeltas queue
263   *
264   * @throws ParentDeadException
265   */
266  private void refreshEffectiveTopology() throws ParentDeadException {
267    LOG.entering("OperatorTopologyImpl", "refreshEffectiveTopology", getQualifiedName());
268    LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
269    synchronized (topologyLock) {
270      LOG.finest(getQualifiedName() + "Acquired topoLock");
271
272      assert effectiveTopology != null;
273
274      final Set<GroupCommunicationMessage> deletionDeltasSet = new HashSet<>();
275      copyDeletionDeltas(deletionDeltasSet);
276
277      LOG.finest(getQualifiedName() + "Updating effective topology struct with deletion msgs");
278      effectiveTopology.update(deletionDeltasSet);
279      LOG.finest(getQualifiedName() + "Released topoLock");
280    }
281    LOG.exiting("OperatorTopologyImpl", "refreshEffectiveTopology", getQualifiedName());
282  }
283
284  /**
285   * @throws ParentDeadException
286   */
287  private void createBaseTopology() throws ParentDeadException {
288    LOG.entering("OperatorTopologyImpl", "createBaseTopology", getQualifiedName());
289    baseTopology = new OperatorTopologyStructImpl(groupName, operName, selfId, driverId, sender, version);
290    updateBaseTopology();
291    LOG.exiting("OperatorTopologyImpl", "createBaseTopology", getQualifiedName());
292  }
293
294  /**
295   * Blocking method that waits till the base topology is updated Unblocks when.
296   * we receive a TopologySetup msg from driver
297   * <p>
298   * Will also update the effective topology when the base topology is updated
299   * so that creation of effective topology is limited to just this method and
300   * refresh will only refresh the effective topology with deletion msgs from
301   * deletionDeltas queue
302   *
303   * @throws ParentDeadException
304   */
305  private void updateBaseTopology() throws ParentDeadException {
306    LOG.entering("OperatorTopologyImpl", "updateBaseTopology", getQualifiedName());
307    LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
308    synchronized (topologyLock) {
309      LOG.finest(getQualifiedName() + "Acquired topoLock");
310      try {
311        assert baseTopology != null;
312        LOG.finest(getQualifiedName() + "Updating base topology. So setting dirty bit");
313        baseTopology.setChanges(true);
314
315        LOG.finest(getQualifiedName() + "Waiting for ctrl msgs");
316        for (GroupCommunicationMessage msg = deltas.take();
317             msg.getType() != ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup;
318             msg = deltas.take()) {
319          LOG.finest(getQualifiedName() + "Got " + msg.getType() + " msg from " + msg.getSrcid());
320          if (effectiveTopology == null &&
321              msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) {
322            /**
323             * If effectiveTopology!=null, this method is being called from the BaseTopologyUpdateStage
324             * And exception thrown will be caught by uncaughtExceptionHandler leading to System.exit
325             */
326            LOG.finer(getQualifiedName() + "Throwing ParentDeadException");
327            throw new ParentDeadException(getQualifiedName()
328                + "Parent dead. Current behavior is for the child to die too.");
329          } else {
330            LOG.finest(getQualifiedName() + "Updating baseTopology struct");
331            baseTopology.update(msg);
332            sendAckToDriver(msg);
333          }
334          LOG.finest(getQualifiedName() + "Waiting for ctrl msgs");
335        }
336
337        updateEffTopologyFromBaseTopology();
338
339      } catch (final InterruptedException e) {
340        throw new RuntimeException("InterruptedException while waiting for delta msg from driver", e);
341      }
342      LOG.finest(getQualifiedName() + "Released topoLock");
343    }
344    LOG.exiting("OperatorTopologyImpl", "updateBaseTopology", getQualifiedName());
345  }
346
347  private void sendAckToDriver(final GroupCommunicationMessage msg) {
348    LOG.entering("OperatorTopologyImpl", "sendAckToDriver", new Object[]{getQualifiedName(), msg});
349    try {
350      final String srcId = msg.getSrcid();
351      if (msg.hasVersion()) {
352        final int srcVersion = msg.getSrcVersion();
353        switch (msg.getType()) {
354        case UpdateTopology:
355          sender.send(Utils.bldVersionedGCM(groupName, operName,
356              ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, selfId, this.version, driverId,
357                srcVersion, Utils.EMPTY_BYTE_ARR));
358          break;
359        case ParentAdd:
360          sender.send(Utils.bldVersionedGCM(groupName, operName,
361              ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdded, selfId, this.version, srcId,
362                srcVersion, Utils.EMPTY_BYTE_ARR), driverId);
363          break;
364        case ParentDead:
365          sender.send(Utils.bldVersionedGCM(groupName, operName,
366              ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentRemoved, selfId, this.version, srcId,
367                srcVersion, Utils.EMPTY_BYTE_ARR), driverId);
368          break;
369        case ChildAdd:
370          sender.send(Utils.bldVersionedGCM(groupName, operName,
371              ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdded, selfId, this.version, srcId,
372                srcVersion, Utils.EMPTY_BYTE_ARR), driverId);
373          break;
374        case ChildDead:
375          sender.send(Utils.bldVersionedGCM(groupName, operName,
376              ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildRemoved, selfId, this.version, srcId,
377                srcVersion, Utils.EMPTY_BYTE_ARR), driverId);
378          break;
379        default:
380          throw new RuntimeException("Received a non control message for acknowledgement");
381        }
382      } else {
383        throw new RuntimeException(getQualifiedName() + "Ack Sender can only deal with versioned msgs");
384      }
385    } catch (final NetworkException e) {
386      throw new RuntimeException("NetworkException while sending ack to driver for delta msg " + msg.getType(), e);
387    }
388    LOG.exiting("OperatorTopologyImpl", "sendAckToDriver", Arrays.toString(new Object[]{getQualifiedName(), msg}));
389  }
390
391  private void updateEffTopologyFromBaseTopology() {
392    LOG.entering("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", getQualifiedName());
393    assert baseTopology != null;
394    LOG.finest(getQualifiedName() + "Updating effective topology");
395    if (baseTopology.hasChanges()) {
396      //Create effectiveTopology from baseTopology
397      effectiveTopology = new OperatorTopologyStructImpl(baseTopology);
398      baseTopology.setChanges(false);
399    }
400    LOG.exiting("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", getQualifiedName());
401  }
402
403  /**
404   * @param deletionDeltasForUpdate
405   * @throws ParentDeadException
406   */
407  private void copyDeletionDeltas(final Set<GroupCommunicationMessage> deletionDeltasForUpdate)
408      throws ParentDeadException {
409    LOG.entering("OperatorTopologyImpl", "copyDeletionDeltas", getQualifiedName());
410    this.deletionDeltas.drainTo(deletionDeltasForUpdate);
411    for (final GroupCommunicationMessage msg : deletionDeltasForUpdate) {
412      final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType = msg.getType();
413      if (msgType == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) {
414        throw new ParentDeadException(getQualifiedName() +
415            "Parent dead. Current behavior is for the child to die too.");
416      }
417    }
418    LOG.exiting("OperatorTopologyImpl", "copyDeletionDeltas", getQualifiedName());
419  }
420
421  private String getQualifiedName() {
422    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + selfId + ":ver(" + version + ") - ";
423  }
424
425  /**
426   * Unlike Dead msgs this needs to be synchronized because data msgs are not
427   * routed through the base topo changes So we need to make sure to wait for
428   * updateTopo to complete and for the new effective topo to take effect. Hence
429   * updatingTopo is set to false in refreshEffTopo. Also, since this is called
430   * from a netty IO thread, we need to create a stage to move the msgs from
431   * netty space to application space and release the netty threads. Otherwise
432   * weird deadlocks can happen Ex: Sent model to k nodes using broadcast. Send
433   * to K+1 th is waiting for ACK. The K nodes already compute their states and
434   * reduce send their results. If we haven't finished refreshEffTopo because of
435   * which updatingTopo is true, we can't add the new msgs if the #netty threads
436   * is k All k threads are waiting to add data. Single user thread that is
437   * waiting for ACK does not come around to refreshEffTopo and we are
438   * deadlocked because there aren't enough netty threads to dispatch msgs to
439   * the application. Hence the stage
440   */
441  private final class DataHandlingStageHandler implements EventHandler<GroupCommunicationMessage> {
442    @Override
443    public void onNext(final GroupCommunicationMessage dataMsg) {
444      LOG.entering("OperatorTopologyImpl.DataHandlingStageHandler", "onNext", new Object[]{getQualifiedName(),
445          dataMsg});
446      LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
447      synchronized (topologyLock) {
448        LOG.finest(getQualifiedName() + "Acquired topoLock");
449        while (updatingTopo.get()) {
450          try {
451            LOG.finest(getQualifiedName() + "Topology is being updated. Released topoLock, Waiting on topoLock");
452            topologyLock.wait();
453            LOG.finest(getQualifiedName() + "Acquired topoLock");
454          } catch (final InterruptedException e) {
455            throw new RuntimeException("InterruptedException while data handling"
456                + "stage was waiting for updatingTopo to become false", e);
457          }
458        }
459        if (effectiveTopology != null) {
460          LOG.finest(getQualifiedName() + "Non-null effectiveTopo.addAsData(msg)");
461          effectiveTopology.addAsData(dataMsg);
462        } else {
463          LOG.fine("Received a data message before effective topology was setup");
464        }
465        LOG.finest(getQualifiedName() + "Released topoLock");
466      }
467      LOG.exiting("OperatorTopologyImpl.DataHandlingStageHandler", "onNext",
468          Arrays.toString(new Object[]{getQualifiedName(), dataMsg}));
469    }
470  }
471
472  private final class BaseTopologyUpdateHandler implements EventHandler<GroupCommunicationMessage> {
473    @Override
474    public void onNext(final GroupCommunicationMessage msg) {
475      assert msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology;
476      assert effectiveTopology != null;
477      LOG.entering("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext", new Object[]{getQualifiedName(), msg});
478      LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
479      synchronized (topologyLock) {
480        LOG.finest(getQualifiedName() + "Acquired topoLock");
481        LOG.finest(getQualifiedName() + "Releasing topoLockAcquired CDL");
482        topologyLockAcquired.countDown();
483        try {
484          updateBaseTopology();
485          LOG.finest(getQualifiedName() + "Completed updating base & effective topologies");
486        } catch (final ParentDeadException e) {
487          throw new RuntimeException(getQualifiedName() + "BaseTopologyUpdateStage: Unexpected ParentDeadException", e);
488        }
489        updatingTopo.set(false);
490        LOG.finest(getQualifiedName() + "Topology update complete. Notifying waiting threads");
491        topologyLock.notifyAll();
492        LOG.finest(getQualifiedName() + "Released topoLock");
493      }
494      LOG.exiting("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext",
495          Arrays.toString(new Object[]{getQualifiedName(), msg}));
496    }
497  }
498}