001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.operators; 020 021import org.apache.reef.driver.task.TaskConfigurationOptions; 022import org.apache.reef.exception.evaluator.NetworkException; 023import org.apache.reef.io.network.exception.ParentDeadException; 024import org.apache.reef.io.network.group.api.operators.Scatter; 025import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; 026import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; 027import org.apache.reef.io.network.group.api.task.OperatorTopology; 028import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 029import org.apache.reef.io.network.group.impl.config.parameters.*; 030import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl; 031import org.apache.reef.io.network.group.impl.utils.ScatterData; 032import org.apache.reef.io.network.group.impl.utils.ScatterDecoder; 033import org.apache.reef.io.network.group.impl.utils.Utils; 034import org.apache.reef.io.network.impl.NetworkService; 035import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 036import org.apache.reef.io.serialization.Codec; 037import org.apache.reef.tang.annotations.Name; 038import org.apache.reef.tang.annotations.Parameter; 039import org.apache.reef.wake.EventHandler; 040 041import javax.inject.Inject; 042import java.util.LinkedList; 043import java.util.List; 044import java.util.concurrent.atomic.AtomicBoolean; 045import java.util.logging.Logger; 046 047public final class ScatterReceiver<T> implements Scatter.Receiver<T>, EventHandler<GroupCommunicationMessage> { 048 049 private static final Logger LOG = Logger.getLogger(ScatterReceiver.class.getName()); 050 051 private final Class<? extends Name<String>> groupName; 052 private final Class<? extends Name<String>> operName; 053 private final Codec<T> dataCodec; 054 private final OperatorTopology topology; 055 private final AtomicBoolean init = new AtomicBoolean(false); 056 private final CommunicationGroupServiceClient commGroupClient; 057 private final int version; 058 private final ScatterDecoder scatterDecoder; 059 060 @Inject 061 public ScatterReceiver(@Parameter(CommunicationGroupName.class) final String groupName, 062 @Parameter(OperatorName.class) final String operName, 063 @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId, 064 @Parameter(DataCodec.class) final Codec<T> dataCodec, 065 @Parameter(DriverIdentifierGroupComm.class) final String driverId, 066 @Parameter(TaskVersion.class) final int version, 067 final CommGroupNetworkHandler commGroupNetworkHandler, 068 final NetworkService<GroupCommunicationMessage> netService, 069 final CommunicationGroupServiceClient commGroupClient, 070 final ScatterDecoder scatterDecoder) { 071 LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString()); 072 this.version = version; 073 this.groupName = Utils.getClass(groupName); 074 this.operName = Utils.getClass(operName); 075 this.dataCodec = dataCodec; 076 this.scatterDecoder = scatterDecoder; 077 this.topology = new OperatorTopologyImpl(this.groupName, this.operName, 078 selfId, driverId, new Sender(netService), version); 079 this.commGroupClient = commGroupClient; 080 commGroupNetworkHandler.register(this.operName, this); 081 } 082 083 @Override 084 public int getVersion() { 085 return version; 086 } 087 088 @Override 089 public void initialize() throws ParentDeadException { 090 topology.initialize(); 091 } 092 093 @Override 094 public Class<? extends Name<String>> getOperName() { 095 return operName; 096 } 097 098 @Override 099 public Class<? extends Name<String>> getGroupName() { 100 return groupName; 101 } 102 103 @Override 104 public String toString() { 105 final StringBuilder sb = new StringBuilder("ScatterReceiver:") 106 .append(Utils.simpleName(groupName)) 107 .append(":") 108 .append(Utils.simpleName(operName)) 109 .append(":") 110 .append(version); 111 return sb.toString(); 112 } 113 114 @Override 115 public void onNext(final GroupCommunicationMessage msg) { 116 topology.handle(msg); 117 } 118 119 @Override 120 public List<T> receive() throws NetworkException, InterruptedException { 121 LOG.entering("ScatterReceiver", "receive"); 122 // I am intermediate node or leaf. 123 LOG.fine("I am " + this); 124 125 if (init.compareAndSet(false, true)) { 126 LOG.fine(this + " Communication group initializing."); 127 commGroupClient.initialize(); 128 LOG.fine(this + " Communication group initialized."); 129 } 130 131 try { 132 LOG.fine(this + " Waiting to receive scatter from parent."); 133 final byte[] data = topology.recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter); 134 135 if (data == null) { 136 LOG.fine(this + " Received null. Perhaps one of my ancestors is dead."); 137 LOG.exiting("ScatterSender", "receive", null); 138 return null; 139 } 140 141 LOG.fine(this + " Successfully received scattered data."); 142 final ScatterData scatterData = scatterDecoder.decode(data); 143 144 LOG.fine(this + " Trying to propagate messages to children."); 145 topology.sendToChildren(scatterData.getChildrenData(), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter); 146 147 LOG.fine(this + " Decoding data elements sent to me."); 148 final List<T> retList = new LinkedList<>(); 149 for (final byte[] singleData : scatterData.getMyData()) { 150 retList.add(dataCodec.decode(singleData)); 151 } 152 153 LOG.exiting("ScatterSender", "receive", retList); 154 return retList; 155 156 } catch (final ParentDeadException e) { 157 throw new RuntimeException("ParentDeadException", e); 158 } 159 } 160}