001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.group.broadcast; 020 021import org.apache.reef.examples.group.bgd.operatornames.ControlMessageBroadcaster; 022import org.apache.reef.examples.group.bgd.parameters.AllCommunicationGroup; 023import org.apache.reef.examples.group.bgd.parameters.ModelDimensions; 024import org.apache.reef.examples.group.broadcast.parameters.ModelBroadcaster; 025import org.apache.reef.examples.group.broadcast.parameters.ModelReceiveAckReducer; 026import org.apache.reef.examples.group.utils.math.DenseVector; 027import org.apache.reef.examples.group.utils.math.Vector; 028import org.apache.reef.io.network.group.api.operators.Broadcast; 029import org.apache.reef.io.network.group.api.operators.Reduce; 030import org.apache.reef.io.network.group.api.GroupChanges; 031import org.apache.reef.io.network.group.api.task.CommunicationGroupClient; 032import org.apache.reef.io.network.group.api.task.GroupCommClient; 033import org.apache.reef.tang.annotations.Parameter; 034import org.apache.reef.task.Task; 035import org.mortbay.log.Log; 036 037import javax.inject.Inject; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Slave task for broadcast example. 043 */ 044public class MasterTask implements Task { 045 046 public static final String TASK_ID = "MasterTask"; 047 048 private static final Logger LOG = Logger.getLogger(MasterTask.class.getName()); 049 050 private final CommunicationGroupClient communicationGroupClient; 051 private final Broadcast.Sender<ControlMessages> controlMessageBroadcaster; 052 private final Broadcast.Sender<Vector> modelBroadcaster; 053 private final Reduce.Receiver<Boolean> modelReceiveAckReducer; 054 055 private final int dimensions; 056 057 @Inject 058 public MasterTask( 059 final GroupCommClient groupCommClient, 060 @Parameter(ModelDimensions.class) final int dimensions) { 061 062 this.dimensions = dimensions; 063 064 this.communicationGroupClient = groupCommClient.getCommunicationGroup(AllCommunicationGroup.class); 065 this.controlMessageBroadcaster = communicationGroupClient.getBroadcastSender(ControlMessageBroadcaster.class); 066 this.modelBroadcaster = communicationGroupClient.getBroadcastSender(ModelBroadcaster.class); 067 this.modelReceiveAckReducer = communicationGroupClient.getReduceReceiver(ModelReceiveAckReducer.class); 068 } 069 070 @Override 071 public byte[] call(final byte[] memento) throws Exception { 072 073 final Vector model = new DenseVector(dimensions); 074 final long time1 = System.currentTimeMillis(); 075 final int numIters = 10; 076 077 for (int i = 0; i < numIters; i++) { 078 079 controlMessageBroadcaster.send(ControlMessages.ReceiveModel); 080 modelBroadcaster.send(model); 081 modelReceiveAckReducer.reduce(); 082 083 final GroupChanges changes = communicationGroupClient.getTopologyChanges(); 084 if (changes.exist()) { 085 Log.info("There exist topology changes. Asking to update Topology"); 086 communicationGroupClient.updateTopology(); 087 } else { 088 Log.info("No changes in topology exist. So not updating topology"); 089 } 090 } 091 092 final long time2 = System.currentTimeMillis(); 093 LOG.log(Level.FINE, "Broadcasting vector of dimensions {0} took {1} secs", 094 new Object[]{dimensions, (time2 - time1) / (numIters * 1000.0)}); 095 096 controlMessageBroadcaster.send(ControlMessages.Stop); 097 098 return null; 099 } 100}