001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.context.ServiceConfiguration; 024import org.apache.reef.driver.evaluator.FailedEvaluator; 025import org.apache.reef.driver.parameters.DriverIdentifier; 026import org.apache.reef.driver.task.FailedTask; 027import org.apache.reef.driver.task.RunningTask; 028import org.apache.reef.io.network.Message; 029import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver; 030import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver; 031import org.apache.reef.io.network.group.api.driver.Topology; 032import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 033import org.apache.reef.io.network.group.impl.GroupCommunicationMessageCodec; 034import org.apache.reef.io.network.group.impl.config.parameters.*; 035import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl; 036import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler; 037import org.apache.reef.io.network.group.impl.utils.Utils; 038import org.apache.reef.io.network.impl.*; 039import org.apache.reef.io.network.naming.NameResolver; 040import org.apache.reef.io.network.naming.NameResolverConfiguration; 041import org.apache.reef.io.network.naming.NameServer; 042import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr; 043import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort; 044import org.apache.reef.io.network.util.StringIdentifierFactory; 045import org.apache.reef.tang.Configuration; 046import org.apache.reef.tang.Injector; 047import org.apache.reef.tang.JavaConfigurationBuilder; 048import org.apache.reef.tang.Tang; 049import org.apache.reef.tang.annotations.Name; 050import org.apache.reef.tang.annotations.Parameter; 051import org.apache.reef.tang.exceptions.InjectionException; 052import org.apache.reef.tang.formats.ConfigurationSerializer; 053import org.apache.reef.util.SingletonAsserter; 054import org.apache.reef.wake.EStage; 055import org.apache.reef.wake.EventHandler; 056import org.apache.reef.wake.IdentifierFactory; 057import org.apache.reef.wake.impl.LoggingEventHandler; 058import org.apache.reef.wake.impl.SyncStage; 059import org.apache.reef.wake.impl.ThreadPoolStage; 060import org.apache.reef.wake.remote.address.LocalAddressProvider; 061import org.apache.reef.wake.remote.transport.TransportFactory; 062 063import javax.inject.Inject; 064import java.util.HashMap; 065import java.util.Map; 066import java.util.concurrent.atomic.AtomicInteger; 067import java.util.logging.Level; 068import java.util.logging.Logger; 069 070/** 071 * Sets up various stages to handle REEF events and adds the per communication 072 * group stages to them whenever a new communication group is created. 073 * <p> 074 * Also starts the NameService and the NetworkService on the driver 075 */ 076public final class GroupCommDriverImpl implements GroupCommServiceDriver { 077 private static final Logger LOG = Logger.getLogger(GroupCommDriverImpl.class.getName()); 078 /** 079 * TANG instance. 080 */ 081 private static final Tang TANG = Tang.Factory.getTang(); 082 083 private final CommunicationGroupDriverFactory commGroupDriverFactory; 084 085 private final AtomicInteger contextIds = new AtomicInteger(0); 086 087 private final IdentifierFactory idFac = new StringIdentifierFactory(); 088 089 private final NameServer nameService; 090 091 private final String nameServiceAddr; 092 private final int nameServicePort; 093 094 private final Map<Class<? extends Name<String>>, CommunicationGroupDriver> commGroupDrivers = new HashMap<>(); 095 096 private final ConfigurationSerializer confSerializer; 097 098 private final NetworkService<GroupCommunicationMessage> netService; 099 100 private final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler; 101 private final EStage<RunningTask> groupCommRunningTaskStage; 102 private final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler; 103 private final EStage<FailedTask> groupCommFailedTaskStage; 104 private final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler; 105 private final EStage<FailedEvaluator> groupCommFailedEvaluatorStage; 106 private final GroupCommMessageHandler groupCommMessageHandler; 107 private final EStage<GroupCommunicationMessage> groupCommMessageStage; 108 private final int fanOut; 109 110 @Inject 111 private GroupCommDriverImpl(final ConfigurationSerializer confSerializer, 112 @Parameter(DriverIdentifier.class) final String driverId, 113 @Parameter(TreeTopologyFanOut.class) final int fanOut, 114 final LocalAddressProvider localAddressProvider, 115 final TransportFactory tpFactory, 116 final NameServer nameService) { 117 assert SingletonAsserter.assertSingleton(getClass()); 118 this.fanOut = fanOut; 119 this.nameService = nameService; 120 this.nameServiceAddr = localAddressProvider.getLocalAddress(); 121 this.nameServicePort = nameService.getPort(); 122 this.confSerializer = confSerializer; 123 this.groupCommRunningTaskHandler = new BroadcastingEventHandler<>(); 124 this.groupCommRunningTaskStage = new SyncStage<>("GroupCommRunningTaskStage", groupCommRunningTaskHandler); 125 this.groupCommFailedTaskHandler = new BroadcastingEventHandler<>(); 126 this.groupCommFailedTaskStage = new SyncStage<>("GroupCommFailedTaskStage", groupCommFailedTaskHandler); 127 this.groupCommFailedEvaluatorHandler = new BroadcastingEventHandler<>(); 128 this.groupCommFailedEvaluatorStage = new SyncStage<>("GroupCommFailedEvaluatorStage", 129 groupCommFailedEvaluatorHandler); 130 this.groupCommMessageHandler = new GroupCommMessageHandler(); 131 this.groupCommMessageStage = new SyncStage<>("GroupCommMessageStage", groupCommMessageHandler); 132 133 final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF 134 .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, nameServiceAddr) 135 .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServicePort) 136 .build()) 137 .build(); 138 139 final NameResolver nameResolver; 140 try { 141 nameResolver = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameResolver.class); 142 } catch (final InjectionException e) { 143 throw new RuntimeException("Failed to instantiate NameResolver", e); 144 } 145 146 try { 147 final Injector injector = TANG.newInjector(); 148 injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, idFac); 149 injector.bindVolatileInstance(NameResolver.class, nameResolver); 150 injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, 151 new GroupCommunicationMessageCodec()); 152 injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class, tpFactory); 153 injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, 154 new EventHandler<Message<GroupCommunicationMessage>>() { 155 @Override 156 public void onNext(final Message<GroupCommunicationMessage> msg) { 157 groupCommMessageStage.onNext(Utils.getGCM(msg)); 158 } 159 }); 160 injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, 161 new LoggingEventHandler<Exception>()); 162 this.netService = injector.getInstance(NetworkService.class); 163 } catch (final InjectionException e) { 164 throw new RuntimeException("Failed to instantiate NetworkService", e); 165 } 166 this.netService.registerId(idFac.getNewInstance(driverId)); 167 final EStage<GroupCommunicationMessage> senderStage 168 = new ThreadPoolStage<>("SrcCtrlMsgSender", new CtrlMsgSender(idFac, netService), 5); 169 170 final Injector injector = TANG.newInjector(); 171 injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage); 172 injector.bindVolatileParameter(DriverIdentifier.class, driverId); 173 injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler); 174 injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler); 175 injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler); 176 injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler); 177 178 try { 179 commGroupDriverFactory = injector.getInstance(CommunicationGroupDriverFactory.class); 180 } catch (final InjectionException e) { 181 throw new RuntimeException(e); 182 } 183 } 184 185 @Override 186 public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName, 187 final int numberOfTasks) { 188 return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, fanOut); 189 } 190 191 @Override 192 public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName, 193 final int numberOfTasks, final int customFanOut) { 194 return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, customFanOut); 195 } 196 197 // TODO[JIRA REEF-391]: Allow different topology implementations for different operations in the same CommGroup. 198 @Override 199 public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName, 200 final Class<? extends Topology> topologyClass, 201 final int numberOfTasks, final int customFanOut) { 202 LOG.entering("GroupCommDriverImpl", "newCommunicationGroup", 203 new Object[]{Utils.simpleName(groupName), numberOfTasks}); 204 205 final CommunicationGroupDriver commGroupDriver; 206 try { 207 commGroupDriver 208 = commGroupDriverFactory.getNewInstance(groupName, topologyClass, numberOfTasks, customFanOut); 209 } catch (final InjectionException e) { 210 LOG.log(Level.WARNING, "Cannot inject new CommunicationGroupDriver"); 211 throw new RuntimeException(e); 212 } 213 214 commGroupDrivers.put(groupName, commGroupDriver); 215 LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup", 216 "Created communication group: " + Utils.simpleName(groupName)); 217 return commGroupDriver; 218 } 219 220 @Override 221 public boolean isConfigured(final ActiveContext activeContext) { 222 LOG.entering("GroupCommDriverImpl", "isConfigured", activeContext.getId()); 223 final boolean retVal = activeContext.getId().startsWith("GroupCommunicationContext-"); 224 LOG.exiting("GroupCommDriverImpl", "isConfigured", retVal); 225 return retVal; 226 } 227 228 @Override 229 public Configuration getContextConfiguration() { 230 LOG.entering("GroupCommDriverImpl", "getContextConf"); 231 final Configuration retVal = ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, 232 "GroupCommunicationContext-" + contextIds.getAndIncrement()).build(); 233 LOG.exiting("GroupCommDriverImpl", "getContextConf", confSerializer.toString(retVal)); 234 return retVal; 235 } 236 237 @Override 238 public Configuration getServiceConfiguration() { 239 LOG.entering("GroupCommDriverImpl", "getServiceConf"); 240 final Configuration serviceConfiguration = ServiceConfiguration.CONF.set(ServiceConfiguration.SERVICES, 241 NetworkService.class) 242 .set(ServiceConfiguration.SERVICES, 243 GroupCommNetworkHandlerImpl.class) 244 .set(ServiceConfiguration.ON_CONTEXT_STOP, 245 NetworkServiceClosingHandler.class) 246 .set(ServiceConfiguration.ON_TASK_STARTED, 247 BindNSToTask.class) 248 .set(ServiceConfiguration.ON_TASK_STOP, 249 UnbindNSFromTask.class).build(); 250 final Configuration retVal = TANG.newConfigurationBuilder(serviceConfiguration) 251 .bindNamedParameter(NetworkServiceParameters.NetworkServiceCodec.class, 252 GroupCommunicationMessageCodec.class) 253 .bindNamedParameter(NetworkServiceParameters.NetworkServiceHandler.class, 254 GroupCommNetworkHandlerImpl.class) 255 .bindNamedParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, 256 ExceptionHandler.class) 257 .bindNamedParameter(NameResolverNameServerAddr.class, nameServiceAddr) 258 .bindNamedParameter(NameResolverNameServerPort.class, Integer.toString(nameServicePort)) 259 .bindNamedParameter(NetworkServiceParameters.NetworkServicePort.class, "0").build(); 260 LOG.exiting("GroupCommDriverImpl", "getServiceConf", confSerializer.toString(retVal)); 261 return retVal; 262 } 263 264 @Override 265 public Configuration getTaskConfiguration(final Configuration partialTaskConf) { 266 LOG.entering("GroupCommDriverImpl", "getTaskConfiguration", new Object[]{confSerializer.toString(partialTaskConf)}); 267 final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(partialTaskConf); 268 for (final CommunicationGroupDriver commGroupDriver : commGroupDrivers.values()) { 269 final Configuration commGroupConf = commGroupDriver.getTaskConfiguration(partialTaskConf); 270 if (commGroupConf != null) { 271 jcb.bindSetEntry(SerializedGroupConfigs.class, confSerializer.toString(commGroupConf)); 272 } 273 } 274 final Configuration retVal = jcb.build(); 275 LOG.exiting("GroupCommDriverImpl", "getTaskConfiguration", confSerializer.toString(retVal)); 276 return retVal; 277 } 278 279 /** 280 * @return the groupCommRunningTaskStage 281 */ 282 @Override 283 public EStage<RunningTask> getGroupCommRunningTaskStage() { 284 LOG.entering("GroupCommDriverImpl", "getGroupCommRunningTaskStage"); 285 LOG.exiting("GroupCommDriverImpl", "getGroupCommRunningTaskStage", "Returning GroupCommRunningTaskStage"); 286 return groupCommRunningTaskStage; 287 } 288 289 /** 290 * @return the groupCommFailedTaskStage 291 */ 292 @Override 293 public EStage<FailedTask> getGroupCommFailedTaskStage() { 294 LOG.entering("GroupCommDriverImpl", "getGroupCommFailedTaskStage"); 295 LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedTaskStage", "Returning GroupCommFailedTaskStage"); 296 return groupCommFailedTaskStage; 297 } 298 299 /** 300 * @return the groupCommFailedEvaluatorStage 301 */ 302 @Override 303 public EStage<FailedEvaluator> getGroupCommFailedEvaluatorStage() { 304 LOG.entering("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage"); 305 LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage", "Returning GroupCommFailedEvaluatorStage"); 306 return groupCommFailedEvaluatorStage; 307 } 308 309}