001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.operators; 020 021import org.apache.reef.driver.task.TaskConfigurationOptions; 022import org.apache.reef.exception.evaluator.NetworkException; 023import org.apache.reef.io.network.exception.ParentDeadException; 024import org.apache.reef.io.network.group.api.operators.Scatter; 025import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; 026import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; 027import org.apache.reef.io.network.group.api.task.OperatorTopology; 028import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 029import org.apache.reef.io.network.group.impl.config.parameters.*; 030import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl; 031import org.apache.reef.io.network.group.impl.utils.ScatterEncoder; 032import org.apache.reef.io.network.group.impl.utils.ScatterHelper; 033import org.apache.reef.io.network.group.impl.utils.Utils; 034import org.apache.reef.io.network.impl.NetworkService; 035import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 036import org.apache.reef.io.serialization.Codec; 037import org.apache.reef.tang.annotations.Name; 038import org.apache.reef.tang.annotations.Parameter; 039import org.apache.reef.wake.EventHandler; 040import org.apache.reef.wake.Identifier; 041 042import javax.inject.Inject; 043import java.util.Arrays; 044import java.util.List; 045import java.util.Map; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.logging.Logger; 048 049public final class ScatterSender<T> implements Scatter.Sender<T>, EventHandler<GroupCommunicationMessage> { 050 051 private static final Logger LOG = Logger.getLogger(ScatterSender.class.getName()); 052 053 private final Class<? extends Name<String>> groupName; 054 private final Class<? extends Name<String>> operName; 055 private final Codec<T> dataCodec; 056 private final OperatorTopology topology; 057 private final AtomicBoolean init = new AtomicBoolean(false); 058 private final CommunicationGroupServiceClient commGroupClient; 059 private final int version; 060 private final ScatterEncoder scatterEncoder; 061 062 @Inject 063 public ScatterSender(@Parameter(CommunicationGroupName.class) final String groupName, 064 @Parameter(OperatorName.class) final String operName, 065 @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId, 066 @Parameter(DataCodec.class) final Codec<T> dataCodec, 067 @Parameter(DriverIdentifierGroupComm.class) final String driverId, 068 @Parameter(TaskVersion.class) final int version, 069 final CommGroupNetworkHandler commGroupNetworkHandler, 070 final NetworkService<GroupCommunicationMessage> netService, 071 final CommunicationGroupServiceClient commGroupClient, 072 final ScatterEncoder scatterEncoder) { 073 LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString()); 074 this.version = version; 075 this.groupName = Utils.getClass(groupName); 076 this.operName = Utils.getClass(operName); 077 this.dataCodec = dataCodec; 078 this.scatterEncoder = scatterEncoder; 079 this.topology = new OperatorTopologyImpl(this.groupName, this.operName, 080 selfId, driverId, new Sender(netService), version); 081 this.commGroupClient = commGroupClient; 082 commGroupNetworkHandler.register(this.operName, this); 083 } 084 085 @Override 086 public int getVersion() { 087 return version; 088 } 089 090 @Override 091 public void initialize() throws ParentDeadException { 092 topology.initialize(); 093 } 094 095 @Override 096 public Class<? extends Name<String>> getOperName() { 097 return operName; 098 } 099 100 @Override 101 public Class<? extends Name<String>> getGroupName() { 102 return groupName; 103 } 104 105 @Override 106 public String toString() { 107 final StringBuilder sb = new StringBuilder("ScatterSender:") 108 .append(Utils.simpleName(groupName)) 109 .append(":") 110 .append(Utils.simpleName(operName)) 111 .append(":") 112 .append(version); 113 return sb.toString(); 114 } 115 116 @Override 117 public void onNext(final GroupCommunicationMessage msg) { 118 topology.handle(msg); 119 } 120 121 private void initializeGroup() { 122 if (init.compareAndSet(false, true)) { 123 LOG.fine(this + " Communication group initializing."); 124 commGroupClient.initialize(); 125 LOG.fine(this + " Communication group initialized."); 126 } 127 } 128 129 @Override 130 public void send(final List<T> elements) throws NetworkException, InterruptedException { 131 LOG.entering("ScatterSender", "send"); 132 133 initializeGroup(); 134 send(elements, 135 ScatterHelper.getUniformCounts(elements.size(), commGroupClient.getActiveSlaveTasks().size()), 136 commGroupClient.getActiveSlaveTasks()); 137 138 LOG.exiting("ScatterSender", "send"); 139 } 140 141 @Override 142 public void send(final List<T> elements, final Integer... counts) 143 throws NetworkException, InterruptedException { 144 LOG.entering("ScatterSender", "send"); 145 146 initializeGroup(); 147 if (counts.length != commGroupClient.getActiveSlaveTasks().size()) { 148 throw new RuntimeException("Parameter 'counts' has length " + counts.length 149 + ", but number of slaves is " + commGroupClient.getActiveSlaveTasks().size()); 150 } 151 152 send(elements, 153 Arrays.asList(counts), 154 commGroupClient.getActiveSlaveTasks()); 155 156 LOG.exiting("ScatterSender", "send"); 157 } 158 159 @Override 160 public void send(final List<T> elements, final List<? extends Identifier> order) 161 throws NetworkException, InterruptedException { 162 LOG.entering("ScatterSender", "send"); 163 164 initializeGroup(); 165 send(elements, 166 ScatterHelper.getUniformCounts(elements.size(), order.size()), 167 order); 168 169 LOG.exiting("ScatterSender", "send"); 170 } 171 172 @Override 173 public void send(final List<T> elements, final List<Integer> counts, final List<? extends Identifier> order) 174 throws NetworkException, InterruptedException { 175 LOG.entering("ScatterSender", "send"); 176 177 if (counts.size() != order.size()) { 178 throw new RuntimeException("Parameter 'counts' has size " + counts.size() 179 + ", but parameter 'order' has size " + order.size() + "."); 180 } 181 initializeGroup(); 182 183 // I am root. 184 LOG.fine("I am " + this); 185 186 LOG.fine(this + " Encoding data and determining which Tasks receive which elements."); 187 final Map<String, byte[]> mapOfChildIdToBytes = scatterEncoder.encode(elements, counts, order, dataCodec); 188 189 try { 190 LOG.fine(this + " Sending " + elements.size() + " elements."); 191 topology.sendToChildren(mapOfChildIdToBytes, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter); 192 193 } catch (final ParentDeadException e) { 194 throw new RuntimeException("ParentDeadException during OperatorTopology.sendToChildren()", e); 195 } 196 197 LOG.exiting("ScatterSender", "send"); 198 } 199}