001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.context.ServiceConfiguration; 024import org.apache.reef.driver.evaluator.FailedEvaluator; 025import org.apache.reef.driver.parameters.DriverIdentifier; 026import org.apache.reef.driver.task.FailedTask; 027import org.apache.reef.driver.task.RunningTask; 028import org.apache.reef.io.network.Message; 029import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver; 030import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver; 031import org.apache.reef.io.network.group.api.driver.Topology; 032import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 033import org.apache.reef.io.network.group.impl.GroupCommunicationMessageCodec; 034import org.apache.reef.io.network.group.impl.config.parameters.*; 035import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl; 036import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler; 037import org.apache.reef.io.network.group.impl.utils.Utils; 038import org.apache.reef.io.network.impl.*; 039import org.apache.reef.io.network.naming.NameResolver; 040import org.apache.reef.io.network.naming.NameResolverConfiguration; 041import org.apache.reef.io.network.naming.NameServer; 042import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr; 043import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort; 044import org.apache.reef.io.network.util.StringIdentifierFactory; 045import org.apache.reef.tang.Configuration; 046import org.apache.reef.tang.Injector; 047import org.apache.reef.tang.JavaConfigurationBuilder; 048import org.apache.reef.tang.Tang; 049import org.apache.reef.tang.annotations.Name; 050import org.apache.reef.tang.annotations.Parameter; 051import org.apache.reef.tang.exceptions.InjectionException; 052import org.apache.reef.tang.formats.ConfigurationSerializer; 053import org.apache.reef.util.SingletonAsserter; 054import org.apache.reef.wake.EStage; 055import org.apache.reef.wake.EventHandler; 056import org.apache.reef.wake.IdentifierFactory; 057import org.apache.reef.wake.impl.LoggingEventHandler; 058import org.apache.reef.wake.impl.SyncStage; 059import org.apache.reef.wake.impl.ThreadPoolStage; 060import org.apache.reef.wake.remote.address.LocalAddressProvider; 061import org.apache.reef.wake.remote.transport.TransportFactory; 062 063import javax.inject.Inject; 064import java.util.HashMap; 065import java.util.Map; 066import java.util.concurrent.atomic.AtomicInteger; 067import java.util.logging.Level; 068import java.util.logging.Logger; 069 070/** 071 * Sets up various stages to handle REEF events and adds the per communication 072 * group stages to them whenever a new communication group is created. 073 * <p> 074 * Also starts the NameService and the NetworkService on the driver 075 */ 076public class GroupCommDriverImpl implements GroupCommServiceDriver { 077 private static final Logger LOG = Logger.getLogger(GroupCommDriverImpl.class.getName()); 078 /** 079 * TANG instance. 080 */ 081 private static final Tang TANG = Tang.Factory.getTang(); 082 083 private final CommunicationGroupDriverFactory commGroupDriverFactory; 084 085 private final AtomicInteger contextIds = new AtomicInteger(0); 086 087 private final IdentifierFactory idFac = new StringIdentifierFactory(); 088 089 private final NameServer nameService; 090 091 private final String nameServiceAddr; 092 private final int nameServicePort; 093 094 private final Map<Class<? extends Name<String>>, CommunicationGroupDriver> commGroupDrivers = new HashMap<>(); 095 096 private final ConfigurationSerializer confSerializer; 097 098 private final NetworkService<GroupCommunicationMessage> netService; 099 100 private final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler; 101 private final EStage<RunningTask> groupCommRunningTaskStage; 102 private final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler; 103 private final EStage<FailedTask> groupCommFailedTaskStage; 104 private final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler; 105 private final EStage<FailedEvaluator> groupCommFailedEvaluatorStage; 106 private final GroupCommMessageHandler groupCommMessageHandler; 107 private final EStage<GroupCommunicationMessage> groupCommMessageStage; 108 private final int fanOut; 109 110 /** 111 * @deprecated in 0.12. Use Tang to obtain an instance of this instead. 112 */ 113 @Deprecated 114 @Inject 115 public GroupCommDriverImpl(final ConfigurationSerializer confSerializer, 116 @Parameter(DriverIdentifier.class) final String driverId, 117 @Parameter(TreeTopologyFanOut.class) final int fanOut, 118 final LocalAddressProvider localAddressProvider, 119 final TransportFactory tpFactory, 120 final NameServer nameService) { 121 assert SingletonAsserter.assertSingleton(getClass()); 122 this.fanOut = fanOut; 123 this.nameService = nameService; 124 this.nameServiceAddr = localAddressProvider.getLocalAddress(); 125 this.nameServicePort = nameService.getPort(); 126 this.confSerializer = confSerializer; 127 this.groupCommRunningTaskHandler = new BroadcastingEventHandler<>(); 128 this.groupCommRunningTaskStage = new SyncStage<>("GroupCommRunningTaskStage", groupCommRunningTaskHandler); 129 this.groupCommFailedTaskHandler = new BroadcastingEventHandler<>(); 130 this.groupCommFailedTaskStage = new SyncStage<>("GroupCommFailedTaskStage", groupCommFailedTaskHandler); 131 this.groupCommFailedEvaluatorHandler = new BroadcastingEventHandler<>(); 132 this.groupCommFailedEvaluatorStage = new SyncStage<>("GroupCommFailedEvaluatorStage", 133 groupCommFailedEvaluatorHandler); 134 this.groupCommMessageHandler = new GroupCommMessageHandler(); 135 this.groupCommMessageStage = new SyncStage<>("GroupCommMessageStage", groupCommMessageHandler); 136 137 final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF 138 .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, nameServiceAddr) 139 .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServicePort) 140 .build()) 141 .build(); 142 143 NameResolver nameResolver = null; 144 try { 145 nameResolver = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameResolver.class); 146 } catch (final InjectionException e) { 147 throw new RuntimeException(e); 148 } 149 150 this.netService = new NetworkService<>(idFac, 0, nameResolver, 151 new GroupCommunicationMessageCodec(), tpFactory, 152 new EventHandler<Message<GroupCommunicationMessage>>() { 153 154 @Override 155 public void onNext(final Message<GroupCommunicationMessage> msg) { 156 groupCommMessageStage.onNext(Utils.getGCM(msg)); 157 } 158 }, new LoggingEventHandler<Exception>(), localAddressProvider); 159 this.netService.registerId(idFac.getNewInstance(driverId)); 160 final EStage<GroupCommunicationMessage> senderStage 161 = new ThreadPoolStage<>("SrcCtrlMsgSender", new CtrlMsgSender(idFac, netService), 5); 162 163 final Injector injector = TANG.newInjector(); 164 injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage); 165 injector.bindVolatileParameter(DriverIdentifier.class, driverId); 166 injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler); 167 injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler); 168 injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler); 169 injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler); 170 171 try { 172 commGroupDriverFactory = injector.getInstance(CommunicationGroupDriverFactory.class); 173 } catch (final InjectionException e) { 174 throw new RuntimeException(e); 175 } 176 } 177 178 @Override 179 public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName, 180 final int numberOfTasks) { 181 return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, fanOut); 182 } 183 184 @Override 185 public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName, 186 final int numberOfTasks, final int customFanOut) { 187 return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, customFanOut); 188 } 189 190 // TODO[JIRA REEF-391]: Allow different topology implementations for different operations in the same CommGroup. 191 @Override 192 public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName, 193 final Class<? extends Topology> topologyClass, 194 final int numberOfTasks, final int customFanOut) { 195 LOG.entering("GroupCommDriverImpl", "newCommunicationGroup", 196 new Object[]{Utils.simpleName(groupName), numberOfTasks}); 197 198 final CommunicationGroupDriver commGroupDriver; 199 try { 200 commGroupDriver 201 = commGroupDriverFactory.getNewInstance(groupName, topologyClass, numberOfTasks, customFanOut); 202 } catch (final InjectionException e) { 203 LOG.log(Level.WARNING, "Cannot inject new CommunicationGroupDriver"); 204 throw new RuntimeException(e); 205 } 206 207 commGroupDrivers.put(groupName, commGroupDriver); 208 LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup", 209 "Created communication group: " + Utils.simpleName(groupName)); 210 return commGroupDriver; 211 } 212 213 @Override 214 public boolean isConfigured(final ActiveContext activeContext) { 215 LOG.entering("GroupCommDriverImpl", "isConfigured", activeContext.getId()); 216 final boolean retVal = activeContext.getId().startsWith("GroupCommunicationContext-"); 217 LOG.exiting("GroupCommDriverImpl", "isConfigured", retVal); 218 return retVal; 219 } 220 221 @Override 222 public Configuration getContextConfiguration() { 223 LOG.entering("GroupCommDriverImpl", "getContextConf"); 224 final Configuration retVal = ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, 225 "GroupCommunicationContext-" + contextIds.getAndIncrement()).build(); 226 LOG.exiting("GroupCommDriverImpl", "getContextConf", confSerializer.toString(retVal)); 227 return retVal; 228 } 229 230 @Override 231 public Configuration getServiceConfiguration() { 232 LOG.entering("GroupCommDriverImpl", "getServiceConf"); 233 final Configuration serviceConfiguration = ServiceConfiguration.CONF.set(ServiceConfiguration.SERVICES, 234 NetworkService.class) 235 .set(ServiceConfiguration.SERVICES, 236 GroupCommNetworkHandlerImpl.class) 237 .set(ServiceConfiguration.ON_CONTEXT_STOP, 238 NetworkServiceClosingHandler.class) 239 .set(ServiceConfiguration.ON_TASK_STARTED, 240 BindNSToTask.class) 241 .set(ServiceConfiguration.ON_TASK_STOP, 242 UnbindNSFromTask.class).build(); 243 final Configuration retVal = TANG.newConfigurationBuilder(serviceConfiguration) 244 .bindNamedParameter(NetworkServiceParameters.NetworkServiceCodec.class, 245 GroupCommunicationMessageCodec.class) 246 .bindNamedParameter(NetworkServiceParameters.NetworkServiceHandler.class, 247 GroupCommNetworkHandlerImpl.class) 248 .bindNamedParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, 249 ExceptionHandler.class) 250 .bindNamedParameter(NameResolverNameServerAddr.class, nameServiceAddr) 251 .bindNamedParameter(NameResolverNameServerPort.class, Integer.toString(nameServicePort)) 252 .bindNamedParameter(NetworkServiceParameters.NetworkServicePort.class, "0").build(); 253 LOG.exiting("GroupCommDriverImpl", "getServiceConf", confSerializer.toString(retVal)); 254 return retVal; 255 } 256 257 @Override 258 public Configuration getTaskConfiguration(final Configuration partialTaskConf) { 259 LOG.entering("GroupCommDriverImpl", "getTaskConfiguration", new Object[]{confSerializer.toString(partialTaskConf)}); 260 final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(partialTaskConf); 261 for (final CommunicationGroupDriver commGroupDriver : commGroupDrivers.values()) { 262 final Configuration commGroupConf = commGroupDriver.getTaskConfiguration(partialTaskConf); 263 if (commGroupConf != null) { 264 jcb.bindSetEntry(SerializedGroupConfigs.class, confSerializer.toString(commGroupConf)); 265 } 266 } 267 final Configuration retVal = jcb.build(); 268 LOG.exiting("GroupCommDriverImpl", "getTaskConfiguration", confSerializer.toString(retVal)); 269 return retVal; 270 } 271 272 /** 273 * @return the groupCommRunningTaskStage 274 */ 275 @Override 276 public EStage<RunningTask> getGroupCommRunningTaskStage() { 277 LOG.entering("GroupCommDriverImpl", "getGroupCommRunningTaskStage"); 278 LOG.exiting("GroupCommDriverImpl", "getGroupCommRunningTaskStage", "Returning GroupCommRunningTaskStage"); 279 return groupCommRunningTaskStage; 280 } 281 282 /** 283 * @return the groupCommFailedTaskStage 284 */ 285 @Override 286 public EStage<FailedTask> getGroupCommFailedTaskStage() { 287 LOG.entering("GroupCommDriverImpl", "getGroupCommFailedTaskStage"); 288 LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedTaskStage", "Returning GroupCommFailedTaskStage"); 289 return groupCommFailedTaskStage; 290 } 291 292 /** 293 * @return the groupCommFailedEvaluatorStage 294 */ 295 @Override 296 public EStage<FailedEvaluator> getGroupCommFailedEvaluatorStage() { 297 LOG.entering("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage"); 298 LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage", "Returning GroupCommFailedEvaluatorStage"); 299 return groupCommFailedEvaluatorStage; 300 } 301 302}