This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.driver;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.context.ServiceConfiguration;
024import org.apache.reef.driver.evaluator.FailedEvaluator;
025import org.apache.reef.driver.parameters.DriverIdentifier;
026import org.apache.reef.driver.task.FailedTask;
027import org.apache.reef.driver.task.RunningTask;
028import org.apache.reef.io.network.Message;
029import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
030import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver;
031import org.apache.reef.io.network.group.api.driver.Topology;
032import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
033import org.apache.reef.io.network.group.impl.GroupCommunicationMessageCodec;
034import org.apache.reef.io.network.group.impl.config.parameters.*;
035import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl;
036import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
037import org.apache.reef.io.network.group.impl.utils.Utils;
038import org.apache.reef.io.network.impl.*;
039import org.apache.reef.io.network.naming.NameResolver;
040import org.apache.reef.io.network.naming.NameResolverConfiguration;
041import org.apache.reef.io.network.naming.NameServer;
042import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr;
043import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort;
044import org.apache.reef.io.network.util.StringIdentifierFactory;
045import org.apache.reef.tang.Configuration;
046import org.apache.reef.tang.Injector;
047import org.apache.reef.tang.JavaConfigurationBuilder;
048import org.apache.reef.tang.Tang;
049import org.apache.reef.tang.annotations.Name;
050import org.apache.reef.tang.annotations.Parameter;
051import org.apache.reef.tang.exceptions.InjectionException;
052import org.apache.reef.tang.formats.ConfigurationSerializer;
053import org.apache.reef.util.SingletonAsserter;
054import org.apache.reef.wake.EStage;
055import org.apache.reef.wake.EventHandler;
056import org.apache.reef.wake.IdentifierFactory;
057import org.apache.reef.wake.impl.LoggingEventHandler;
058import org.apache.reef.wake.impl.SyncStage;
059import org.apache.reef.wake.impl.ThreadPoolStage;
060import org.apache.reef.wake.remote.address.LocalAddressProvider;
061import org.apache.reef.wake.remote.transport.TransportFactory;
062
063import javax.inject.Inject;
064import java.util.HashMap;
065import java.util.Map;
066import java.util.concurrent.atomic.AtomicInteger;
067import java.util.logging.Level;
068import java.util.logging.Logger;
069
070/**
071 * Sets up various stages to handle REEF events and adds the per communication
072 * group stages to them whenever a new communication group is created.
073 * <p>
074 * Also starts the NameService and the NetworkService on the driver
075 */
076public class GroupCommDriverImpl implements GroupCommServiceDriver {
077  private static final Logger LOG = Logger.getLogger(GroupCommDriverImpl.class.getName());
078  /**
079   * TANG instance.
080   */
081  private static final Tang TANG = Tang.Factory.getTang();
082
083  private final CommunicationGroupDriverFactory commGroupDriverFactory;
084
085  private final AtomicInteger contextIds = new AtomicInteger(0);
086
087  private final IdentifierFactory idFac = new StringIdentifierFactory();
088
089  private final NameServer nameService;
090
091  private final String nameServiceAddr;
092  private final int nameServicePort;
093
094  private final Map<Class<? extends Name<String>>, CommunicationGroupDriver> commGroupDrivers = new HashMap<>();
095
096  private final ConfigurationSerializer confSerializer;
097
098  private final NetworkService<GroupCommunicationMessage> netService;
099
100  private final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler;
101  private final EStage<RunningTask> groupCommRunningTaskStage;
102  private final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler;
103  private final EStage<FailedTask> groupCommFailedTaskStage;
104  private final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler;
105  private final EStage<FailedEvaluator> groupCommFailedEvaluatorStage;
106  private final GroupCommMessageHandler groupCommMessageHandler;
107  private final EStage<GroupCommunicationMessage> groupCommMessageStage;
108  private final int fanOut;
109
110  /**
111   * @deprecated in 0.12. Use Tang to obtain an instance of this instead.
112   */
113  @Deprecated
114  @Inject
115  public GroupCommDriverImpl(final ConfigurationSerializer confSerializer,
116                             @Parameter(DriverIdentifier.class) final String driverId,
117                             @Parameter(TreeTopologyFanOut.class) final int fanOut,
118                             final LocalAddressProvider localAddressProvider,
119                             final TransportFactory tpFactory,
120                             final NameServer nameService) {
121    assert SingletonAsserter.assertSingleton(getClass());
122    this.fanOut = fanOut;
123    this.nameService = nameService;
124    this.nameServiceAddr = localAddressProvider.getLocalAddress();
125    this.nameServicePort = nameService.getPort();
126    this.confSerializer = confSerializer;
127    this.groupCommRunningTaskHandler = new BroadcastingEventHandler<>();
128    this.groupCommRunningTaskStage = new SyncStage<>("GroupCommRunningTaskStage", groupCommRunningTaskHandler);
129    this.groupCommFailedTaskHandler = new BroadcastingEventHandler<>();
130    this.groupCommFailedTaskStage = new SyncStage<>("GroupCommFailedTaskStage", groupCommFailedTaskHandler);
131    this.groupCommFailedEvaluatorHandler = new BroadcastingEventHandler<>();
132    this.groupCommFailedEvaluatorStage = new SyncStage<>("GroupCommFailedEvaluatorStage",
133        groupCommFailedEvaluatorHandler);
134    this.groupCommMessageHandler = new GroupCommMessageHandler();
135    this.groupCommMessageStage = new SyncStage<>("GroupCommMessageStage", groupCommMessageHandler);
136
137    final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
138        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, nameServiceAddr)
139        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServicePort)
140        .build())
141        .build();
142
143    NameResolver nameResolver = null;
144    try {
145      nameResolver = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameResolver.class);
146    } catch (final InjectionException e) {
147      throw new RuntimeException(e);
148    }
149
150    this.netService = new NetworkService<>(idFac, 0, nameResolver,
151        new GroupCommunicationMessageCodec(), tpFactory,
152        new EventHandler<Message<GroupCommunicationMessage>>() {
153
154          @Override
155          public void onNext(final Message<GroupCommunicationMessage> msg) {
156            groupCommMessageStage.onNext(Utils.getGCM(msg));
157          }
158        }, new LoggingEventHandler<Exception>(), localAddressProvider);
159    this.netService.registerId(idFac.getNewInstance(driverId));
160    final EStage<GroupCommunicationMessage> senderStage
161        = new ThreadPoolStage<>("SrcCtrlMsgSender", new CtrlMsgSender(idFac, netService), 5);
162
163    final Injector injector = TANG.newInjector();
164    injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage);
165    injector.bindVolatileParameter(DriverIdentifier.class, driverId);
166    injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler);
167    injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler);
168    injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler);
169    injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler);
170
171    try {
172      commGroupDriverFactory = injector.getInstance(CommunicationGroupDriverFactory.class);
173    } catch (final InjectionException e) {
174      throw new RuntimeException(e);
175    }
176  }
177
178  @Override
179  public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
180                                                        final int numberOfTasks) {
181    return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, fanOut);
182  }
183
184  @Override
185  public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
186                                                        final int numberOfTasks, final int customFanOut) {
187    return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, customFanOut);
188  }
189
190  // TODO[JIRA REEF-391]: Allow different topology implementations for different operations in the same CommGroup.
191  @Override
192  public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
193                                                        final Class<? extends Topology> topologyClass,
194                                                        final int numberOfTasks, final int customFanOut) {
195    LOG.entering("GroupCommDriverImpl", "newCommunicationGroup",
196        new Object[]{Utils.simpleName(groupName), numberOfTasks});
197
198    final CommunicationGroupDriver commGroupDriver;
199    try {
200      commGroupDriver
201          = commGroupDriverFactory.getNewInstance(groupName, topologyClass, numberOfTasks, customFanOut);
202    } catch (final InjectionException e) {
203      LOG.log(Level.WARNING, "Cannot inject new CommunicationGroupDriver");
204      throw new RuntimeException(e);
205    }
206
207    commGroupDrivers.put(groupName, commGroupDriver);
208    LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup",
209        "Created communication group: " + Utils.simpleName(groupName));
210    return commGroupDriver;
211  }
212
213  @Override
214  public boolean isConfigured(final ActiveContext activeContext) {
215    LOG.entering("GroupCommDriverImpl", "isConfigured", activeContext.getId());
216    final boolean retVal = activeContext.getId().startsWith("GroupCommunicationContext-");
217    LOG.exiting("GroupCommDriverImpl", "isConfigured", retVal);
218    return retVal;
219  }
220
221  @Override
222  public Configuration getContextConfiguration() {
223    LOG.entering("GroupCommDriverImpl", "getContextConf");
224    final Configuration retVal = ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER,
225        "GroupCommunicationContext-" + contextIds.getAndIncrement()).build();
226    LOG.exiting("GroupCommDriverImpl", "getContextConf", confSerializer.toString(retVal));
227    return retVal;
228  }
229
230  @Override
231  public Configuration getServiceConfiguration() {
232    LOG.entering("GroupCommDriverImpl", "getServiceConf");
233    final Configuration serviceConfiguration = ServiceConfiguration.CONF.set(ServiceConfiguration.SERVICES,
234        NetworkService.class)
235        .set(ServiceConfiguration.SERVICES,
236            GroupCommNetworkHandlerImpl.class)
237        .set(ServiceConfiguration.ON_CONTEXT_STOP,
238            NetworkServiceClosingHandler.class)
239        .set(ServiceConfiguration.ON_TASK_STARTED,
240            BindNSToTask.class)
241        .set(ServiceConfiguration.ON_TASK_STOP,
242            UnbindNSFromTask.class).build();
243    final Configuration retVal = TANG.newConfigurationBuilder(serviceConfiguration)
244        .bindNamedParameter(NetworkServiceParameters.NetworkServiceCodec.class,
245            GroupCommunicationMessageCodec.class)
246        .bindNamedParameter(NetworkServiceParameters.NetworkServiceHandler.class,
247            GroupCommNetworkHandlerImpl.class)
248        .bindNamedParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class,
249            ExceptionHandler.class)
250        .bindNamedParameter(NameResolverNameServerAddr.class, nameServiceAddr)
251        .bindNamedParameter(NameResolverNameServerPort.class, Integer.toString(nameServicePort))
252        .bindNamedParameter(NetworkServiceParameters.NetworkServicePort.class, "0").build();
253    LOG.exiting("GroupCommDriverImpl", "getServiceConf", confSerializer.toString(retVal));
254    return retVal;
255  }
256
257  @Override
258  public Configuration getTaskConfiguration(final Configuration partialTaskConf) {
259    LOG.entering("GroupCommDriverImpl", "getTaskConfiguration", new Object[]{confSerializer.toString(partialTaskConf)});
260    final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(partialTaskConf);
261    for (final CommunicationGroupDriver commGroupDriver : commGroupDrivers.values()) {
262      final Configuration commGroupConf = commGroupDriver.getTaskConfiguration(partialTaskConf);
263      if (commGroupConf != null) {
264        jcb.bindSetEntry(SerializedGroupConfigs.class, confSerializer.toString(commGroupConf));
265      }
266    }
267    final Configuration retVal = jcb.build();
268    LOG.exiting("GroupCommDriverImpl", "getTaskConfiguration", confSerializer.toString(retVal));
269    return retVal;
270  }
271
272  /**
273   * @return the groupCommRunningTaskStage
274   */
275  @Override
276  public EStage<RunningTask> getGroupCommRunningTaskStage() {
277    LOG.entering("GroupCommDriverImpl", "getGroupCommRunningTaskStage");
278    LOG.exiting("GroupCommDriverImpl", "getGroupCommRunningTaskStage", "Returning GroupCommRunningTaskStage");
279    return groupCommRunningTaskStage;
280  }
281
282  /**
283   * @return the groupCommFailedTaskStage
284   */
285  @Override
286  public EStage<FailedTask> getGroupCommFailedTaskStage() {
287    LOG.entering("GroupCommDriverImpl", "getGroupCommFailedTaskStage");
288    LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedTaskStage", "Returning GroupCommFailedTaskStage");
289    return groupCommFailedTaskStage;
290  }
291
292  /**
293   * @return the groupCommFailedEvaluatorStage
294   */
295  @Override
296  public EStage<FailedEvaluator> getGroupCommFailedEvaluatorStage() {
297    LOG.entering("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage");
298    LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage", "Returning GroupCommFailedEvaluatorStage");
299    return groupCommFailedEvaluatorStage;
300  }
301
302}