001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.group; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.task.CompletedTask; 027import org.apache.reef.driver.task.RunningTask; 028import org.apache.reef.driver.task.TaskConfiguration; 029import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver; 030import org.apache.reef.io.network.group.api.driver.GroupCommDriver; 031import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec; 032import org.apache.reef.io.serialization.SerializableCodec; 033import org.apache.reef.tang.Configuration; 034import org.apache.reef.tang.annotations.Name; 035import org.apache.reef.tang.annotations.NamedParameter; 036import org.apache.reef.tang.annotations.Unit; 037import org.apache.reef.wake.EventHandler; 038import org.apache.reef.wake.time.event.StartTime; 039 040import javax.inject.Inject; 041import java.util.ArrayList; 042import java.util.List; 043import java.util.concurrent.atomic.AtomicInteger; 044import java.util.logging.Level; 045import java.util.logging.Logger; 046 047/** 048 * Driver used for testing multiple communication groups. 049 */ 050@DriverSide 051@Unit 052public final class MultipleCommGroupsDriver { 053 private static final Logger LOG = Logger.getLogger(MultipleCommGroupsDriver.class.getName()); 054 055 private final EvaluatorRequestor requestor; 056 private final GroupCommDriver groupCommDriver; 057 058 private final String[][] taskIds; 059 private final AtomicInteger[] taskCounter; 060 private final List<CommunicationGroupDriver> commGroupDriverList; 061 private final List<ActiveContext> activeContextsToBeHandled; 062 063 @Inject 064 private MultipleCommGroupsDriver(final EvaluatorRequestor requestor, 065 final GroupCommDriver groupCommDriver) { 066 this.requestor = requestor; 067 this.groupCommDriver = groupCommDriver; 068 taskIds = new String[][]{ 069 {"MasterTask-1", "SlaveTask-1-1", "SlaveTask-1-2", "SlaveTask-1-3"}, 070 {"MasterTask-2", "SlaveTask-2-1"} 071 }; 072 taskCounter = new AtomicInteger[]{new AtomicInteger(0), new AtomicInteger(0)}; 073 commGroupDriverList = new ArrayList<>(2); 074 activeContextsToBeHandled = new ArrayList<>(2); 075 initializeCommGroups(); 076 } 077 078 private void initializeCommGroups() { 079 commGroupDriverList.add(groupCommDriver.newCommunicationGroup(Group1.class, 4)); 080 commGroupDriverList.add(groupCommDriver.newCommunicationGroup(Group2.class, 2)); 081 commGroupDriverList.get(0).addBroadcast(BroadcastOperatorName.class, 082 BroadcastOperatorSpec.newBuilder() 083 .setSenderId(taskIds[0][0]) 084 .setDataCodecClass(SerializableCodec.class) 085 .build()); 086 commGroupDriverList.get(1).addBroadcast(BroadcastOperatorName.class, 087 BroadcastOperatorSpec.newBuilder() 088 .setSenderId(taskIds[1][0]) 089 .setDataCodecClass(SerializableCodec.class) 090 .build()); 091 } 092 093 final class StartHandler implements EventHandler<StartTime> { 094 095 @Override 096 public void onNext(final StartTime startTime) { 097 requestor.submit(EvaluatorRequest.newBuilder() 098 .setNumber(4) 099 .setMemory(128) 100 .build()); 101 } 102 } 103 104 final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 105 106 @Override 107 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 108 LOG.log(Level.INFO, "Evaluator allocated {0}", allocatedEvaluator); 109 allocatedEvaluator.submitContextAndService( 110 groupCommDriver.getContextConfiguration(), groupCommDriver.getServiceConfiguration()); 111 } 112 } 113 114 final class ContextActiveHandler implements EventHandler<ActiveContext> { 115 private final AtomicInteger contextCounter = new AtomicInteger(0); 116 117 @Override 118 public void onNext(final ActiveContext activeContext) { 119 final int count = contextCounter.getAndIncrement(); 120 121 if (count <= 1) { 122 LOG.log(Level.INFO, "{0} will be handled after tasks in Group1 started", activeContext); 123 activeContextsToBeHandled.add(activeContext); 124 } else { 125 // Add task to Group1 126 submitTask(activeContext, 0); 127 } 128 } 129 } 130 131 final class TaskRunningHandler implements EventHandler<RunningTask> { 132 private final AtomicInteger runningTaskCounter = new AtomicInteger(0); 133 134 @Override 135 public void onNext(final RunningTask runningTask) { 136 LOG.log(Level.INFO, "{0} has started", runningTask); 137 final int count = runningTaskCounter.getAndIncrement(); 138 // After two tasks has started, submit tasks to the active contexts in activeContextsToBeHandled 139 if (count == 1) { 140 for (final ActiveContext activeContext : activeContextsToBeHandled) { 141 // Add task to Group2 142 submitTask(activeContext, 1); 143 } 144 } 145 } 146 } 147 148 private void submitTask(final ActiveContext activeContext, final int groupIndex) { 149 final String taskId = taskIds[groupIndex][taskCounter[groupIndex].getAndIncrement()]; 150 LOG.log(Level.INFO, "Got active context {0}. Submit {1}", new Object[]{activeContext, taskId}); 151 final Configuration partialTaskConf; 152 if (taskId.equals(taskIds[groupIndex][0])) { 153 partialTaskConf = TaskConfiguration.CONF 154 .set(TaskConfiguration.IDENTIFIER, taskId) 155 .set(TaskConfiguration.TASK, MasterTask.class) 156 .build(); 157 } else { 158 partialTaskConf = TaskConfiguration.CONF 159 .set(TaskConfiguration.IDENTIFIER, taskId) 160 .set(TaskConfiguration.TASK, SlaveTask.class) 161 .build(); 162 } 163 commGroupDriverList.get(groupIndex).addTask(partialTaskConf); 164 activeContext.submitTask(groupCommDriver.getTaskConfiguration(partialTaskConf)); 165 } 166 167 final class TaskCompletedHandler implements EventHandler<CompletedTask> { 168 private final AtomicInteger completedTaskCounter = new AtomicInteger(0); 169 170 @Override 171 public void onNext(final CompletedTask completedTask) { 172 final int count = completedTaskCounter.getAndIncrement(); 173 LOG.log(Level.INFO, "{0} has completed.", completedTask); 174 if (count <= 1) { 175 // Add task to Group1 176 submitTask(completedTask.getActiveContext(), 0); 177 } else { 178 completedTask.getActiveContext().close(); 179 } 180 } 181 } 182 183 @NamedParameter() 184 final class Group1 implements Name<String> { 185 } 186 187 @NamedParameter() 188 final class Group2 implements Name<String> { 189 } 190 191 @NamedParameter() 192 final class BroadcastOperatorName implements Name<String> { 193 } 194}