This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.task;
020
021import org.apache.reef.driver.task.TaskConfigurationOptions;
022import org.apache.reef.io.network.impl.NetworkService;
023import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
024import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
025import org.apache.reef.io.network.group.api.task.GroupCommClient;
026import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler;
027import org.apache.reef.io.network.group.impl.config.parameters.SerializedGroupConfigs;
028import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
029import org.apache.reef.tang.Configuration;
030import org.apache.reef.tang.Injector;
031import org.apache.reef.tang.Tang;
032import org.apache.reef.tang.annotations.Name;
033import org.apache.reef.tang.annotations.Parameter;
034import org.apache.reef.tang.exceptions.InjectionException;
035import org.apache.reef.tang.formats.ConfigurationSerializer;
036
037import javax.inject.Inject;
038import java.io.IOException;
039import java.util.HashMap;
040import java.util.Map;
041import java.util.Set;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045public class GroupCommClientImpl implements GroupCommClient {
046  private static final Logger LOG = Logger.getLogger(GroupCommClientImpl.class.getName());
047
048  private final Map<Class<? extends Name<String>>, CommunicationGroupServiceClient> communicationGroups =
049      new HashMap<>();
050
051  /**
052   * @deprecated in 0.14.
053   * Use the other constructor that receives an {@code injector} as a parameter instead.
054   * The parameters {@code taskId} and {@code netService} can be removed from the other constructor when
055   * this constructor gets deleted.
056   */
057  @Deprecated
058  @Inject
059  public GroupCommClientImpl(
060      @Parameter(SerializedGroupConfigs.class) final Set<String> groupConfigs,
061      @Parameter(TaskConfigurationOptions.Identifier.class) final String taskId,
062      final GroupCommNetworkHandler groupCommNetworkHandler,
063      final NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> netService,
064      final ConfigurationSerializer configSerializer) {
065
066    LOG.log(Level.FINEST, "GroupCommHandler-{0}", groupCommNetworkHandler);
067
068    for (final String groupConfigStr : groupConfigs) {
069      try {
070        final Configuration groupConfig = configSerializer.fromString(groupConfigStr);
071
072        final Injector injector = Tang.Factory.getTang().newInjector(groupConfig);
073        injector.bindVolatileParameter(TaskConfigurationOptions.Identifier.class, taskId);
074        injector.bindVolatileInstance(GroupCommNetworkHandler.class, groupCommNetworkHandler);
075        injector.bindVolatileInstance(NetworkService.class, netService);
076
077        final CommunicationGroupServiceClient commGroupClient =
078            injector.getInstance(CommunicationGroupServiceClient.class);
079
080        this.communicationGroups.put(commGroupClient.getName(), commGroupClient);
081
082      } catch (final InjectionException | IOException e) {
083        throw new RuntimeException("Unable to deserialize operator config", e);
084      }
085    }
086  }
087
088  @Inject
089  private GroupCommClientImpl(@Parameter(SerializedGroupConfigs.class) final Set<String> groupConfigs,
090                              @Parameter(TaskConfigurationOptions.Identifier.class) final String taskId,
091                              final GroupCommNetworkHandler groupCommNetworkHandler,
092                              final NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> netService,
093                              final ConfigurationSerializer configSerializer,
094                              final Injector injector) {
095
096    LOG.log(Level.FINEST, "GroupCommHandler-{0}", groupCommNetworkHandler);
097
098    for (final String groupConfigStr : groupConfigs) {
099      try {
100        final Configuration groupConfig = configSerializer.fromString(groupConfigStr);
101        final Injector forkedInjector = injector.forkInjector(groupConfig);
102
103        final CommunicationGroupServiceClient commGroupClient =
104            forkedInjector.getInstance(CommunicationGroupServiceClient.class);
105
106        this.communicationGroups.put(commGroupClient.getName(), commGroupClient);
107
108      } catch (final InjectionException | IOException e) {
109        throw new RuntimeException("Unable to deserialize operator config", e);
110      }
111    }
112  }
113
114  @Override
115  public CommunicationGroupClient getCommunicationGroup(
116      final Class<? extends Name<String>> groupName) {
117    return communicationGroups.get(groupName);
118  }
119}