001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.operators; 020 021import org.apache.reef.driver.task.TaskConfigurationOptions; 022import org.apache.reef.exception.evaluator.NetworkException; 023import org.apache.reef.io.network.exception.ParentDeadException; 024import org.apache.reef.io.network.group.api.operators.Reduce; 025import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; 026import org.apache.reef.io.network.impl.NetworkService; 027import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; 028import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; 029import org.apache.reef.io.network.group.api.task.OperatorTopology; 030import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 031import org.apache.reef.io.network.group.impl.config.parameters.*; 032import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl; 033import org.apache.reef.io.network.group.impl.utils.Utils; 034import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 035import org.apache.reef.io.serialization.Codec; 036import org.apache.reef.tang.annotations.Name; 037import org.apache.reef.tang.annotations.Parameter; 038import org.apache.reef.wake.EventHandler; 039 040import javax.inject.Inject; 041import java.util.ArrayList; 042import java.util.List; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.logging.Level; 045import java.util.logging.Logger; 046 047public class ReduceSender<T> implements Reduce.Sender<T>, EventHandler<GroupCommunicationMessage> { 048 049 private static final Logger LOG = Logger.getLogger(ReduceSender.class.getName()); 050 051 private final Class<? extends Name<String>> groupName; 052 private final Class<? extends Name<String>> operName; 053 private final CommGroupNetworkHandler commGroupNetworkHandler; 054 private final Codec<T> dataCodec; 055 private final NetworkService<GroupCommunicationMessage> netService; 056 private final Sender sender; 057 private final ReduceFunction<T> reduceFunction; 058 059 private final OperatorTopology topology; 060 061 private final CommunicationGroupServiceClient commGroupClient; 062 063 private final AtomicBoolean init = new AtomicBoolean(false); 064 065 private final int version; 066 067 @Inject 068 public ReduceSender( 069 @Parameter(CommunicationGroupName.class) final String groupName, 070 @Parameter(OperatorName.class) final String operName, 071 @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId, 072 @Parameter(DataCodec.class) final Codec<T> dataCodec, 073 @Parameter(ReduceFunctionParam.class) final ReduceFunction<T> reduceFunction, 074 @Parameter(DriverIdentifierGroupComm.class) final String driverId, 075 @Parameter(TaskVersion.class) final int version, 076 final CommGroupNetworkHandler commGroupNetworkHandler, 077 final NetworkService<GroupCommunicationMessage> netService, 078 final CommunicationGroupServiceClient commGroupClient) { 079 080 super(); 081 082 LOG.log(Level.FINEST, "{0} has CommGroupHandler-{1}", 083 new Object[]{operName, commGroupNetworkHandler}); 084 085 this.version = version; 086 this.groupName = Utils.getClass(groupName); 087 this.operName = Utils.getClass(operName); 088 this.dataCodec = dataCodec; 089 this.reduceFunction = reduceFunction; 090 this.commGroupNetworkHandler = commGroupNetworkHandler; 091 this.netService = netService; 092 this.sender = new Sender(this.netService); 093 this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version); 094 this.commGroupNetworkHandler.register(this.operName, this); 095 this.commGroupClient = commGroupClient; 096 } 097 098 @Override 099 public int getVersion() { 100 return version; 101 } 102 103 @Override 104 public void initialize() throws ParentDeadException { 105 topology.initialize(); 106 } 107 108 @Override 109 public Class<? extends Name<String>> getOperName() { 110 return operName; 111 } 112 113 @Override 114 public Class<? extends Name<String>> getGroupName() { 115 return groupName; 116 } 117 118 @Override 119 public String toString() { 120 return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version; 121 } 122 123 @Override 124 public void onNext(final GroupCommunicationMessage msg) { 125 topology.handle(msg); 126 } 127 128 @Override 129 public void send(final T myData) throws NetworkException, InterruptedException { 130 LOG.entering("ReduceSender", "send", this); 131 LOG.fine("I am " + this); 132 133 if (init.compareAndSet(false, true)) { 134 commGroupClient.initialize(); 135 } 136 // I am an intermediate node or leaf. 137 LOG.finest("Waiting for children"); 138 // Wait for children to send 139 try { 140 final T reducedValueOfChildren = topology.recvFromChildren(reduceFunction, dataCodec); 141 final List<T> vals = new ArrayList<>(2); 142 vals.add(myData); 143 if (reducedValueOfChildren != null) { 144 vals.add(reducedValueOfChildren); 145 } 146 final T reducedValue = reduceFunction.apply(vals); 147 topology.sendToParent(dataCodec.encode(reducedValue), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce); 148 } catch (final ParentDeadException e) { 149 throw new RuntimeException("ParentDeadException", e); 150 } 151 LOG.exiting("ReduceSender", "send", this); 152 } 153 154 @Override 155 public ReduceFunction<T> getReduceFunction() { 156 return reduceFunction; 157 } 158}