001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.operators;
020
021import org.apache.reef.driver.task.TaskConfigurationOptions;
022import org.apache.reef.exception.evaluator.NetworkException;
023import org.apache.reef.io.network.exception.ParentDeadException;
024import org.apache.reef.io.network.group.api.operators.Broadcast;
025import org.apache.reef.io.network.group.impl.config.parameters.*;
026import org.apache.reef.io.network.impl.NetworkService;
027import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
028import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
029import org.apache.reef.io.network.group.api.task.OperatorTopology;
030import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
031import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
032import org.apache.reef.io.network.group.impl.utils.Utils;
033import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
034import org.apache.reef.io.serialization.Codec;
035import org.apache.reef.tang.annotations.Name;
036import org.apache.reef.tang.annotations.Parameter;
037import org.apache.reef.wake.EventHandler;
038
039import javax.inject.Inject;
040
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.logging.Logger;
043
044public class BroadcastReceiver<T> implements Broadcast.Receiver<T>, EventHandler<GroupCommunicationMessage> {
045
046  private static final Logger LOG = Logger.getLogger(BroadcastReceiver.class.getName());
047
048  private final Class<? extends Name<String>> groupName;
049  private final Class<? extends Name<String>> operName;
050  private final CommGroupNetworkHandler commGroupNetworkHandler;
051  private final Codec<T> dataCodec;
052  private final NetworkService<GroupCommunicationMessage> netService;
053  private final Sender sender;
054
055  private final OperatorTopology topology;
056
057  private final AtomicBoolean init = new AtomicBoolean(false);
058
059  private final CommunicationGroupServiceClient commGroupClient;
060
061  private final int version;
062
063  @Inject
064  public BroadcastReceiver(@Parameter(CommunicationGroupName.class) final String groupName,
065                           @Parameter(OperatorName.class) final String operName,
066                           @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
067                           @Parameter(DataCodec.class) final Codec<T> dataCodec,
068                           @Parameter(DriverIdentifierGroupComm.class) final String driverId,
069                           @Parameter(TaskVersion.class) final int version,
070                           final CommGroupNetworkHandler commGroupNetworkHandler,
071                           final NetworkService<GroupCommunicationMessage> netService,
072                           final CommunicationGroupServiceClient commGroupClient) {
073    super();
074    this.version = version;
075    LOG.finest(operName + " has CommGroupHandler-" + commGroupNetworkHandler.toString());
076    this.groupName = Utils.getClass(groupName);
077    this.operName = Utils.getClass(operName);
078    this.dataCodec = dataCodec;
079    this.commGroupNetworkHandler = commGroupNetworkHandler;
080    this.netService = netService;
081    this.sender = new Sender(this.netService);
082    this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version);
083    this.commGroupNetworkHandler.register(this.operName, this);
084    this.commGroupClient = commGroupClient;
085  }
086
087  @Override
088  public int getVersion() {
089    return version;
090  }
091
092  @Override
093  public void initialize() throws ParentDeadException {
094    topology.initialize();
095  }
096
097  @Override
098  public Class<? extends Name<String>> getOperName() {
099    return operName;
100  }
101
102  @Override
103  public Class<? extends Name<String>> getGroupName() {
104    return groupName;
105  }
106
107  @Override
108  public String toString() {
109    return "BroadcastReceiver:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version;
110  }
111
112  @Override
113  public void onNext(final GroupCommunicationMessage msg) {
114    topology.handle(msg);
115  }
116
117  @Override
118  public T receive() throws NetworkException, InterruptedException {
119    LOG.entering("BroadcastReceiver", "receive", this);
120    LOG.fine("I am " + this);
121
122    if (init.compareAndSet(false, true)) {
123      LOG.fine(this + " Communication group initializing");
124      commGroupClient.initialize();
125      LOG.fine(this + " Communication group initialized");
126    }
127    // I am an intermediate node or leaf.
128
129    final T retVal;
130    // Wait for parent to send
131    LOG.fine(this + " Waiting to receive broadcast");
132    final byte[] data;
133    try {
134      data = topology.recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast);
135      // TODO: Should receive the identity element instead of null
136      if (data == null) {
137        LOG.fine(this + " Received null. Perhaps one of my ancestors is dead.");
138        retVal = null;
139      } else {
140        LOG.finest("Using " + dataCodec.getClass().getSimpleName() + " as codec");
141        retVal = dataCodec.decode(data);
142        LOG.finest("Decoded msg successfully");
143        LOG.finest(this + " Sending to children.");
144      }
145
146      topology.sendToChildren(data, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast);
147    } catch (final ParentDeadException e) {
148      throw new RuntimeException("ParentDeadException", e);
149    }
150    LOG.exiting("BroadcastReceiver", "receive", this);
151    return retVal;
152  }
153
154}