001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.operators;
020
021import org.apache.reef.driver.task.TaskConfigurationOptions;
022import org.apache.reef.exception.evaluator.NetworkException;
023import org.apache.reef.io.network.exception.ParentDeadException;
024import org.apache.reef.io.network.group.api.operators.Broadcast;
025import org.apache.reef.io.network.group.impl.config.parameters.*;
026import org.apache.reef.io.network.impl.NetworkService;
027import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
028import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
029import org.apache.reef.io.network.group.api.task.OperatorTopology;
030import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
031import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
032import org.apache.reef.io.network.group.impl.utils.Utils;
033import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
034import org.apache.reef.io.serialization.Codec;
035import org.apache.reef.tang.annotations.Name;
036import org.apache.reef.tang.annotations.Parameter;
037import org.apache.reef.wake.EventHandler;
038
039import javax.inject.Inject;
040
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.logging.Logger;
043
044public class BroadcastSender<T> implements Broadcast.Sender<T>, EventHandler<GroupCommunicationMessage> {
045
046  private static final Logger LOG = Logger.getLogger(BroadcastSender.class.getName());
047
048  private final Class<? extends Name<String>> groupName;
049  private final Class<? extends Name<String>> operName;
050  private final CommGroupNetworkHandler commGroupNetworkHandler;
051  private final Codec<T> dataCodec;
052  private final NetworkService<GroupCommunicationMessage> netService;
053  private final Sender sender;
054
055  private final OperatorTopology topology;
056
057  private final AtomicBoolean init = new AtomicBoolean(false);
058
059  private final CommunicationGroupServiceClient commGroupClient;
060
061  private final int version;
062
063  @Inject
064  public BroadcastSender(@Parameter(CommunicationGroupName.class) final String groupName,
065                         @Parameter(OperatorName.class) final String operName,
066                         @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
067                         @Parameter(DataCodec.class) final Codec<T> dataCodec,
068                         @Parameter(DriverIdentifierGroupComm.class) final String driverId,
069                         @Parameter(TaskVersion.class) final int version,
070                         final CommGroupNetworkHandler commGroupNetworkHandler,
071                         final NetworkService<GroupCommunicationMessage> netService,
072                         final CommunicationGroupServiceClient commGroupClient) {
073    super();
074    this.version = version;
075    LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString());
076    this.groupName = Utils.getClass(groupName);
077    this.operName = Utils.getClass(operName);
078    this.dataCodec = dataCodec;
079    this.commGroupNetworkHandler = commGroupNetworkHandler;
080    this.netService = netService;
081    this.sender = new Sender(this.netService);
082    this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version);
083    this.commGroupNetworkHandler.register(this.operName, this);
084    this.commGroupClient = commGroupClient;
085  }
086
087  @Override
088  public int getVersion() {
089    return version;
090  }
091
092  @Override
093  public void initialize() throws ParentDeadException {
094    topology.initialize();
095  }
096
097  @Override
098  public Class<? extends Name<String>> getOperName() {
099    return operName;
100  }
101
102  @Override
103  public Class<? extends Name<String>> getGroupName() {
104    return groupName;
105  }
106
107  @Override
108  public String toString() {
109    return "BroadcastSender:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version;
110  }
111
112  @Override
113  public void onNext(final GroupCommunicationMessage msg) {
114    topology.handle(msg);
115  }
116
117  @Override
118  public void send(final T element) throws NetworkException, InterruptedException {
119    LOG.entering("BroadcastSender", "send", this);
120    LOG.fine("I am " + this);
121
122    if (init.compareAndSet(false, true)) {
123      LOG.fine(this + " Communication group initializing");
124      commGroupClient.initialize();
125      LOG.fine(this + " Communication group initialized");
126    }
127
128    try {
129      topology.sendToChildren(dataCodec.encode(element), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast);
130    } catch (final ParentDeadException e) {
131      throw new RuntimeException("ParentDeadException", e);
132    }
133    LOG.exiting("BroadcastSender", "send", this);
134  }
135
136}