This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.group.broadcast;
020
021import org.apache.reef.examples.group.bgd.operatornames.ControlMessageBroadcaster;
022import org.apache.reef.examples.group.bgd.parameters.AllCommunicationGroup;
023import org.apache.reef.examples.group.bgd.parameters.ModelDimensions;
024import org.apache.reef.examples.group.broadcast.parameters.ModelBroadcaster;
025import org.apache.reef.examples.group.broadcast.parameters.ModelReceiveAckReducer;
026import org.apache.reef.examples.group.utils.math.DenseVector;
027import org.apache.reef.examples.group.utils.math.Vector;
028import org.apache.reef.io.network.group.api.operators.Broadcast;
029import org.apache.reef.io.network.group.api.operators.Reduce;
030import org.apache.reef.io.network.group.api.GroupChanges;
031import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
032import org.apache.reef.io.network.group.api.task.GroupCommClient;
033import org.apache.reef.tang.annotations.Parameter;
034import org.apache.reef.task.Task;
035import org.mortbay.log.Log;
036
037import javax.inject.Inject;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * Slave task for broadcast example.
043 */
044public class MasterTask implements Task {
045
046  public static final String TASK_ID = "MasterTask";
047
048  private static final Logger LOG = Logger.getLogger(MasterTask.class.getName());
049
050  private final CommunicationGroupClient communicationGroupClient;
051  private final Broadcast.Sender<ControlMessages> controlMessageBroadcaster;
052  private final Broadcast.Sender<Vector> modelBroadcaster;
053  private final Reduce.Receiver<Boolean> modelReceiveAckReducer;
054
055  private final int dimensions;
056
057  @Inject
058  public MasterTask(
059      final GroupCommClient groupCommClient,
060      @Parameter(ModelDimensions.class) final int dimensions) {
061
062    this.dimensions = dimensions;
063
064    this.communicationGroupClient = groupCommClient.getCommunicationGroup(AllCommunicationGroup.class);
065    this.controlMessageBroadcaster = communicationGroupClient.getBroadcastSender(ControlMessageBroadcaster.class);
066    this.modelBroadcaster = communicationGroupClient.getBroadcastSender(ModelBroadcaster.class);
067    this.modelReceiveAckReducer = communicationGroupClient.getReduceReceiver(ModelReceiveAckReducer.class);
068  }
069
070  @Override
071  public byte[] call(final byte[] memento) throws Exception {
072
073    final Vector model = new DenseVector(dimensions);
074    final long time1 = System.currentTimeMillis();
075    final int numIters = 10;
076
077    for (int i = 0; i < numIters; i++) {
078
079      controlMessageBroadcaster.send(ControlMessages.ReceiveModel);
080      modelBroadcaster.send(model);
081      modelReceiveAckReducer.reduce();
082
083      final GroupChanges changes = communicationGroupClient.getTopologyChanges();
084      if (changes.exist()) {
085        Log.info("There exist topology changes. Asking to update Topology");
086        communicationGroupClient.updateTopology();
087      } else {
088        Log.info("No changes in topology exist. So not updating topology");
089      }
090    }
091
092    final long time2 = System.currentTimeMillis();
093    LOG.log(Level.FINE, "Broadcasting vector of dimensions {0} took {1} secs",
094        new Object[]{dimensions, (time2 - time1) / (numIters * 1000.0)});
095
096    controlMessageBroadcaster.send(ControlMessages.Stop);
097
098    return null;
099  }
100}