This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.group;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.task.CompletedTask;
027import org.apache.reef.driver.task.RunningTask;
028import org.apache.reef.driver.task.TaskConfiguration;
029import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
030import org.apache.reef.io.network.group.api.driver.GroupCommDriver;
031import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
032import org.apache.reef.io.serialization.SerializableCodec;
033import org.apache.reef.tang.Configuration;
034import org.apache.reef.tang.annotations.Name;
035import org.apache.reef.tang.annotations.NamedParameter;
036import org.apache.reef.tang.annotations.Unit;
037import org.apache.reef.wake.EventHandler;
038import org.apache.reef.wake.time.event.StartTime;
039
040import javax.inject.Inject;
041import java.util.ArrayList;
042import java.util.List;
043import java.util.concurrent.atomic.AtomicInteger;
044import java.util.logging.Level;
045import java.util.logging.Logger;
046
047/**
048 * Driver used for testing multiple communication groups.
049 */
050@DriverSide
051@Unit
052public final class MultipleCommGroupsDriver {
053  private static final Logger LOG = Logger.getLogger(MultipleCommGroupsDriver.class.getName());
054
055  private final EvaluatorRequestor requestor;
056  private final GroupCommDriver groupCommDriver;
057
058  private final String[][] taskIds;
059  private final AtomicInteger[] taskCounter;
060  private final List<CommunicationGroupDriver> commGroupDriverList;
061  private final List<ActiveContext> activeContextsToBeHandled;
062
063  @Inject
064  private MultipleCommGroupsDriver(final EvaluatorRequestor requestor,
065                                   final GroupCommDriver groupCommDriver) {
066    this.requestor = requestor;
067    this.groupCommDriver = groupCommDriver;
068    taskIds = new String[][]{
069        {"MasterTask-1", "SlaveTask-1-1", "SlaveTask-1-2", "SlaveTask-1-3"},
070        {"MasterTask-2", "SlaveTask-2-1"}
071    };
072    taskCounter = new AtomicInteger[]{new AtomicInteger(0), new AtomicInteger(0)};
073    commGroupDriverList = new ArrayList<>(2);
074    activeContextsToBeHandled = new ArrayList<>(2);
075    initializeCommGroups();
076  }
077
078  private void initializeCommGroups() {
079    commGroupDriverList.add(groupCommDriver.newCommunicationGroup(Group1.class, 4));
080    commGroupDriverList.add(groupCommDriver.newCommunicationGroup(Group2.class, 2));
081    commGroupDriverList.get(0).addBroadcast(BroadcastOperatorName.class,
082        BroadcastOperatorSpec.newBuilder()
083            .setSenderId(taskIds[0][0])
084            .setDataCodecClass(SerializableCodec.class)
085            .build());
086    commGroupDriverList.get(1).addBroadcast(BroadcastOperatorName.class,
087        BroadcastOperatorSpec.newBuilder()
088            .setSenderId(taskIds[1][0])
089            .setDataCodecClass(SerializableCodec.class)
090            .build());
091  }
092
093  final class StartHandler implements EventHandler<StartTime> {
094
095    @Override
096    public void onNext(final StartTime startTime) {
097      requestor.submit(EvaluatorRequest.newBuilder()
098          .setNumber(4)
099          .setMemory(128)
100          .build());
101    }
102  }
103
104  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
105
106    @Override
107    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
108      LOG.log(Level.INFO, "Evaluator allocated {0}", allocatedEvaluator);
109      allocatedEvaluator.submitContextAndService(
110          groupCommDriver.getContextConfiguration(), groupCommDriver.getServiceConfiguration());
111    }
112  }
113
114  final class ContextActiveHandler implements EventHandler<ActiveContext> {
115    private final AtomicInteger contextCounter = new AtomicInteger(0);
116
117    @Override
118    public void onNext(final ActiveContext activeContext) {
119      final int count = contextCounter.getAndIncrement();
120
121      if (count <= 1) {
122        LOG.log(Level.INFO, "{0} will be handled after tasks in Group1 started", activeContext);
123        activeContextsToBeHandled.add(activeContext);
124      } else {
125        // Add task to Group1
126        submitTask(activeContext, 0);
127      }
128    }
129  }
130
131  final class TaskRunningHandler implements EventHandler<RunningTask> {
132    private final AtomicInteger runningTaskCounter = new AtomicInteger(0);
133
134    @Override
135    public void onNext(final RunningTask runningTask) {
136      LOG.log(Level.INFO, "{0} has started", runningTask);
137      final int count = runningTaskCounter.getAndIncrement();
138      // After two tasks has started, submit tasks to the active contexts in activeContextsToBeHandled
139      if (count == 1) {
140        for (final ActiveContext activeContext : activeContextsToBeHandled) {
141          // Add task to Group2
142          submitTask(activeContext, 1);
143        }
144      }
145    }
146  }
147
148  private void submitTask(final ActiveContext activeContext, final int groupIndex) {
149    final String taskId = taskIds[groupIndex][taskCounter[groupIndex].getAndIncrement()];
150    LOG.log(Level.INFO, "Got active context {0}. Submit {1}", new Object[]{activeContext, taskId});
151    final Configuration partialTaskConf;
152    if (taskId.equals(taskIds[groupIndex][0])) {
153      partialTaskConf = TaskConfiguration.CONF
154          .set(TaskConfiguration.IDENTIFIER, taskId)
155          .set(TaskConfiguration.TASK, MasterTask.class)
156          .build();
157    } else {
158      partialTaskConf = TaskConfiguration.CONF
159          .set(TaskConfiguration.IDENTIFIER, taskId)
160          .set(TaskConfiguration.TASK, SlaveTask.class)
161          .build();
162    }
163    commGroupDriverList.get(groupIndex).addTask(partialTaskConf);
164    activeContext.submitTask(groupCommDriver.getTaskConfiguration(partialTaskConf));
165  }
166
167  final class TaskCompletedHandler implements EventHandler<CompletedTask> {
168    private final AtomicInteger completedTaskCounter = new AtomicInteger(0);
169
170    @Override
171    public void onNext(final CompletedTask completedTask) {
172      final int count = completedTaskCounter.getAndIncrement();
173      LOG.log(Level.INFO, "{0} has completed.", completedTask);
174      if (count <= 1) {
175        // Add task to Group1
176        submitTask(completedTask.getActiveContext(), 0);
177      } else {
178        completedTask.getActiveContext().close();
179      }
180    }
181  }
182
183  @NamedParameter()
184  final class Group1 implements Name<String> {
185  }
186
187  @NamedParameter()
188  final class Group2 implements Name<String> {
189  }
190
191  @NamedParameter()
192  final class BroadcastOperatorName implements Name<String> {
193  }
194}