This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.driver;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.context.ServiceConfiguration;
024import org.apache.reef.driver.evaluator.FailedEvaluator;
025import org.apache.reef.driver.parameters.DriverIdentifier;
026import org.apache.reef.driver.task.FailedTask;
027import org.apache.reef.driver.task.RunningTask;
028import org.apache.reef.io.network.Message;
029import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
030import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver;
031import org.apache.reef.io.network.group.api.driver.Topology;
032import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
033import org.apache.reef.io.network.group.impl.GroupCommunicationMessageCodec;
034import org.apache.reef.io.network.group.impl.config.parameters.*;
035import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl;
036import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
037import org.apache.reef.io.network.group.impl.utils.Utils;
038import org.apache.reef.io.network.impl.*;
039import org.apache.reef.io.network.naming.NameResolver;
040import org.apache.reef.io.network.naming.NameResolverConfiguration;
041import org.apache.reef.io.network.naming.NameServer;
042import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr;
043import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort;
044import org.apache.reef.io.network.util.StringIdentifierFactory;
045import org.apache.reef.tang.Configuration;
046import org.apache.reef.tang.Injector;
047import org.apache.reef.tang.JavaConfigurationBuilder;
048import org.apache.reef.tang.Tang;
049import org.apache.reef.tang.annotations.Name;
050import org.apache.reef.tang.annotations.Parameter;
051import org.apache.reef.tang.exceptions.InjectionException;
052import org.apache.reef.tang.formats.ConfigurationSerializer;
053import org.apache.reef.util.SingletonAsserter;
054import org.apache.reef.wake.EStage;
055import org.apache.reef.wake.EventHandler;
056import org.apache.reef.wake.IdentifierFactory;
057import org.apache.reef.wake.impl.LoggingEventHandler;
058import org.apache.reef.wake.impl.SyncStage;
059import org.apache.reef.wake.impl.ThreadPoolStage;
060import org.apache.reef.wake.remote.address.LocalAddressProvider;
061import org.apache.reef.wake.remote.transport.TransportFactory;
062
063import javax.inject.Inject;
064import java.util.HashMap;
065import java.util.Map;
066import java.util.concurrent.atomic.AtomicInteger;
067import java.util.logging.Level;
068import java.util.logging.Logger;
069
070/**
071 * Sets up various stages to handle REEF events and adds the per communication
072 * group stages to them whenever a new communication group is created.
073 * <p>
074 * Also starts the NameService and the NetworkService on the driver
075 */
076public final class GroupCommDriverImpl implements GroupCommServiceDriver {
077  private static final Logger LOG = Logger.getLogger(GroupCommDriverImpl.class.getName());
078  /**
079   * TANG instance.
080   */
081  private static final Tang TANG = Tang.Factory.getTang();
082
083  private final CommunicationGroupDriverFactory commGroupDriverFactory;
084
085  private final AtomicInteger contextIds = new AtomicInteger(0);
086
087  private final IdentifierFactory idFac = new StringIdentifierFactory();
088
089  private final NameServer nameService;
090
091  private final String nameServiceAddr;
092  private final int nameServicePort;
093
094  private final Map<Class<? extends Name<String>>, CommunicationGroupDriver> commGroupDrivers = new HashMap<>();
095
096  private final ConfigurationSerializer confSerializer;
097
098  private final NetworkService<GroupCommunicationMessage> netService;
099
100  private final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler;
101  private final EStage<RunningTask> groupCommRunningTaskStage;
102  private final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler;
103  private final EStage<FailedTask> groupCommFailedTaskStage;
104  private final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler;
105  private final EStage<FailedEvaluator> groupCommFailedEvaluatorStage;
106  private final GroupCommMessageHandler groupCommMessageHandler;
107  private final EStage<GroupCommunicationMessage> groupCommMessageStage;
108  private final int fanOut;
109
110  @Inject
111  private GroupCommDriverImpl(final ConfigurationSerializer confSerializer,
112                             @Parameter(DriverIdentifier.class) final String driverId,
113                             @Parameter(TreeTopologyFanOut.class) final int fanOut,
114                             final LocalAddressProvider localAddressProvider,
115                             final TransportFactory tpFactory,
116                             final NameServer nameService) {
117    assert SingletonAsserter.assertSingleton(getClass());
118    this.fanOut = fanOut;
119    this.nameService = nameService;
120    this.nameServiceAddr = localAddressProvider.getLocalAddress();
121    this.nameServicePort = nameService.getPort();
122    this.confSerializer = confSerializer;
123    this.groupCommRunningTaskHandler = new BroadcastingEventHandler<>();
124    this.groupCommRunningTaskStage = new SyncStage<>("GroupCommRunningTaskStage", groupCommRunningTaskHandler);
125    this.groupCommFailedTaskHandler = new BroadcastingEventHandler<>();
126    this.groupCommFailedTaskStage = new SyncStage<>("GroupCommFailedTaskStage", groupCommFailedTaskHandler);
127    this.groupCommFailedEvaluatorHandler = new BroadcastingEventHandler<>();
128    this.groupCommFailedEvaluatorStage = new SyncStage<>("GroupCommFailedEvaluatorStage",
129        groupCommFailedEvaluatorHandler);
130    this.groupCommMessageHandler = new GroupCommMessageHandler();
131    this.groupCommMessageStage = new SyncStage<>("GroupCommMessageStage", groupCommMessageHandler);
132
133    final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
134        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, nameServiceAddr)
135        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServicePort)
136        .build())
137        .build();
138
139    final NameResolver nameResolver;
140    try {
141      nameResolver = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameResolver.class);
142    } catch (final InjectionException e) {
143      throw new RuntimeException("Failed to instantiate NameResolver", e);
144    }
145
146    try {
147      final Injector injector = TANG.newInjector();
148      injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, idFac);
149      injector.bindVolatileInstance(NameResolver.class, nameResolver);
150      injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class,
151          new GroupCommunicationMessageCodec());
152      injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class, tpFactory);
153      injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
154          new EventHandler<Message<GroupCommunicationMessage>>() {
155            @Override
156            public void onNext(final Message<GroupCommunicationMessage> msg) {
157              groupCommMessageStage.onNext(Utils.getGCM(msg));
158            }
159          });
160      injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class,
161          new LoggingEventHandler<Exception>());
162      this.netService = injector.getInstance(NetworkService.class);
163    } catch (final InjectionException e) {
164      throw new RuntimeException("Failed to instantiate NetworkService", e);
165    }
166    this.netService.registerId(idFac.getNewInstance(driverId));
167    final EStage<GroupCommunicationMessage> senderStage
168        = new ThreadPoolStage<>("SrcCtrlMsgSender", new CtrlMsgSender(idFac, netService), 5);
169
170    final Injector injector = TANG.newInjector();
171    injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage);
172    injector.bindVolatileParameter(DriverIdentifier.class, driverId);
173    injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler);
174    injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler);
175    injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler);
176    injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler);
177
178    try {
179      commGroupDriverFactory = injector.getInstance(CommunicationGroupDriverFactory.class);
180    } catch (final InjectionException e) {
181      throw new RuntimeException(e);
182    }
183  }
184
185  @Override
186  public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
187                                                        final int numberOfTasks) {
188    return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, fanOut);
189  }
190
191  @Override
192  public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
193                                                        final int numberOfTasks, final int customFanOut) {
194    return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, customFanOut);
195  }
196
197  // TODO[JIRA REEF-391]: Allow different topology implementations for different operations in the same CommGroup.
198  @Override
199  public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
200                                                        final Class<? extends Topology> topologyClass,
201                                                        final int numberOfTasks, final int customFanOut) {
202    LOG.entering("GroupCommDriverImpl", "newCommunicationGroup",
203        new Object[]{Utils.simpleName(groupName), numberOfTasks});
204
205    final CommunicationGroupDriver commGroupDriver;
206    try {
207      commGroupDriver
208          = commGroupDriverFactory.getNewInstance(groupName, topologyClass, numberOfTasks, customFanOut);
209    } catch (final InjectionException e) {
210      LOG.log(Level.WARNING, "Cannot inject new CommunicationGroupDriver");
211      throw new RuntimeException(e);
212    }
213
214    commGroupDrivers.put(groupName, commGroupDriver);
215    LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup",
216        "Created communication group: " + Utils.simpleName(groupName));
217    return commGroupDriver;
218  }
219
220  @Override
221  public boolean isConfigured(final ActiveContext activeContext) {
222    LOG.entering("GroupCommDriverImpl", "isConfigured", activeContext.getId());
223    final boolean retVal = activeContext.getId().startsWith("GroupCommunicationContext-");
224    LOG.exiting("GroupCommDriverImpl", "isConfigured", retVal);
225    return retVal;
226  }
227
228  @Override
229  public Configuration getContextConfiguration() {
230    LOG.entering("GroupCommDriverImpl", "getContextConf");
231    final Configuration retVal = ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER,
232        "GroupCommunicationContext-" + contextIds.getAndIncrement()).build();
233    LOG.exiting("GroupCommDriverImpl", "getContextConf", confSerializer.toString(retVal));
234    return retVal;
235  }
236
237  @Override
238  public Configuration getServiceConfiguration() {
239    LOG.entering("GroupCommDriverImpl", "getServiceConf");
240    final Configuration serviceConfiguration = ServiceConfiguration.CONF.set(ServiceConfiguration.SERVICES,
241        NetworkService.class)
242        .set(ServiceConfiguration.SERVICES,
243            GroupCommNetworkHandlerImpl.class)
244        .set(ServiceConfiguration.ON_CONTEXT_STOP,
245            NetworkServiceClosingHandler.class)
246        .set(ServiceConfiguration.ON_TASK_STARTED,
247            BindNSToTask.class)
248        .set(ServiceConfiguration.ON_TASK_STOP,
249            UnbindNSFromTask.class).build();
250    final Configuration retVal = TANG.newConfigurationBuilder(serviceConfiguration)
251        .bindNamedParameter(NetworkServiceParameters.NetworkServiceCodec.class,
252            GroupCommunicationMessageCodec.class)
253        .bindNamedParameter(NetworkServiceParameters.NetworkServiceHandler.class,
254            GroupCommNetworkHandlerImpl.class)
255        .bindNamedParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class,
256            ExceptionHandler.class)
257        .bindNamedParameter(NameResolverNameServerAddr.class, nameServiceAddr)
258        .bindNamedParameter(NameResolverNameServerPort.class, Integer.toString(nameServicePort))
259        .bindNamedParameter(NetworkServiceParameters.NetworkServicePort.class, "0").build();
260    LOG.exiting("GroupCommDriverImpl", "getServiceConf", confSerializer.toString(retVal));
261    return retVal;
262  }
263
264  @Override
265  public Configuration getTaskConfiguration(final Configuration partialTaskConf) {
266    LOG.entering("GroupCommDriverImpl", "getTaskConfiguration", new Object[]{confSerializer.toString(partialTaskConf)});
267    final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(partialTaskConf);
268    for (final CommunicationGroupDriver commGroupDriver : commGroupDrivers.values()) {
269      final Configuration commGroupConf = commGroupDriver.getTaskConfiguration(partialTaskConf);
270      if (commGroupConf != null) {
271        jcb.bindSetEntry(SerializedGroupConfigs.class, confSerializer.toString(commGroupConf));
272      }
273    }
274    final Configuration retVal = jcb.build();
275    LOG.exiting("GroupCommDriverImpl", "getTaskConfiguration", confSerializer.toString(retVal));
276    return retVal;
277  }
278
279  /**
280   * @return the groupCommRunningTaskStage
281   */
282  @Override
283  public EStage<RunningTask> getGroupCommRunningTaskStage() {
284    LOG.entering("GroupCommDriverImpl", "getGroupCommRunningTaskStage");
285    LOG.exiting("GroupCommDriverImpl", "getGroupCommRunningTaskStage", "Returning GroupCommRunningTaskStage");
286    return groupCommRunningTaskStage;
287  }
288
289  /**
290   * @return the groupCommFailedTaskStage
291   */
292  @Override
293  public EStage<FailedTask> getGroupCommFailedTaskStage() {
294    LOG.entering("GroupCommDriverImpl", "getGroupCommFailedTaskStage");
295    LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedTaskStage", "Returning GroupCommFailedTaskStage");
296    return groupCommFailedTaskStage;
297  }
298
299  /**
300   * @return the groupCommFailedEvaluatorStage
301   */
302  @Override
303  public EStage<FailedEvaluator> getGroupCommFailedEvaluatorStage() {
304    LOG.entering("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage");
305    LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage", "Returning GroupCommFailedEvaluatorStage");
306    return groupCommFailedEvaluatorStage;
307  }
308
309}