001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.operators; 020 021import org.apache.reef.driver.task.TaskConfigurationOptions; 022import org.apache.reef.exception.evaluator.NetworkException; 023import org.apache.reef.io.network.exception.ParentDeadException; 024import org.apache.reef.io.network.group.api.operators.Broadcast; 025import org.apache.reef.io.network.group.impl.config.parameters.*; 026import org.apache.reef.io.network.impl.NetworkService; 027import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; 028import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; 029import org.apache.reef.io.network.group.api.task.OperatorTopology; 030import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 031import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl; 032import org.apache.reef.io.network.group.impl.utils.Utils; 033import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 034import org.apache.reef.io.serialization.Codec; 035import org.apache.reef.tang.annotations.Name; 036import org.apache.reef.tang.annotations.Parameter; 037import org.apache.reef.wake.EventHandler; 038 039import javax.inject.Inject; 040 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.logging.Logger; 043 044public class BroadcastSender<T> implements Broadcast.Sender<T>, EventHandler<GroupCommunicationMessage> { 045 046 private static final Logger LOG = Logger.getLogger(BroadcastSender.class.getName()); 047 048 private final Class<? extends Name<String>> groupName; 049 private final Class<? extends Name<String>> operName; 050 private final CommGroupNetworkHandler commGroupNetworkHandler; 051 private final Codec<T> dataCodec; 052 private final NetworkService<GroupCommunicationMessage> netService; 053 private final Sender sender; 054 055 private final OperatorTopology topology; 056 057 private final AtomicBoolean init = new AtomicBoolean(false); 058 059 private final CommunicationGroupServiceClient commGroupClient; 060 061 private final int version; 062 063 @Inject 064 public BroadcastSender(@Parameter(CommunicationGroupName.class) final String groupName, 065 @Parameter(OperatorName.class) final String operName, 066 @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId, 067 @Parameter(DataCodec.class) final Codec<T> dataCodec, 068 @Parameter(DriverIdentifierGroupComm.class) final String driverId, 069 @Parameter(TaskVersion.class) final int version, 070 final CommGroupNetworkHandler commGroupNetworkHandler, 071 final NetworkService<GroupCommunicationMessage> netService, 072 final CommunicationGroupServiceClient commGroupClient) { 073 super(); 074 this.version = version; 075 LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString()); 076 this.groupName = Utils.getClass(groupName); 077 this.operName = Utils.getClass(operName); 078 this.dataCodec = dataCodec; 079 this.commGroupNetworkHandler = commGroupNetworkHandler; 080 this.netService = netService; 081 this.sender = new Sender(this.netService); 082 this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version); 083 this.commGroupNetworkHandler.register(this.operName, this); 084 this.commGroupClient = commGroupClient; 085 } 086 087 @Override 088 public int getVersion() { 089 return version; 090 } 091 092 @Override 093 public void initialize() throws ParentDeadException { 094 topology.initialize(); 095 } 096 097 @Override 098 public Class<? extends Name<String>> getOperName() { 099 return operName; 100 } 101 102 @Override 103 public Class<? extends Name<String>> getGroupName() { 104 return groupName; 105 } 106 107 @Override 108 public String toString() { 109 return "BroadcastSender:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version; 110 } 111 112 @Override 113 public void onNext(final GroupCommunicationMessage msg) { 114 topology.handle(msg); 115 } 116 117 @Override 118 public void send(final T element) throws NetworkException, InterruptedException { 119 LOG.entering("BroadcastSender", "send", this); 120 LOG.fine("I am " + this); 121 122 if (init.compareAndSet(false, true)) { 123 LOG.fine(this + " Communication group initializing"); 124 commGroupClient.initialize(); 125 LOG.fine(this + " Communication group initialized"); 126 } 127 128 try { 129 topology.sendToChildren(dataCodec.encode(element), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast); 130 } catch (final ParentDeadException e) { 131 throw new RuntimeException("ParentDeadException", e); 132 } 133 LOG.exiting("BroadcastSender", "send", this); 134 } 135 136}