This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.task;
020
021import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
022import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
023import org.apache.reef.io.network.group.api.task.GroupCommClient;
024import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler;
025import org.apache.reef.io.network.group.impl.config.parameters.SerializedGroupConfigs;
026import org.apache.reef.tang.Configuration;
027import org.apache.reef.tang.Injector;
028import org.apache.reef.tang.annotations.Name;
029import org.apache.reef.tang.annotations.Parameter;
030import org.apache.reef.tang.exceptions.InjectionException;
031import org.apache.reef.tang.formats.ConfigurationSerializer;
032
033import javax.inject.Inject;
034import java.io.IOException;
035import java.util.HashMap;
036import java.util.Map;
037import java.util.Set;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041public final class GroupCommClientImpl implements GroupCommClient {
042  private static final Logger LOG = Logger.getLogger(GroupCommClientImpl.class.getName());
043
044  private final Map<Class<? extends Name<String>>, CommunicationGroupServiceClient> communicationGroups =
045      new HashMap<>();
046
047  @Inject
048  private GroupCommClientImpl(@Parameter(SerializedGroupConfigs.class) final Set<String> groupConfigs,
049                              final GroupCommNetworkHandler groupCommNetworkHandler,
050                              final ConfigurationSerializer configSerializer,
051                              final Injector injector) {
052
053    LOG.log(Level.FINEST, "GroupCommHandler-{0}", groupCommNetworkHandler);
054
055    for (final String groupConfigStr : groupConfigs) {
056      try {
057        final Configuration groupConfig = configSerializer.fromString(groupConfigStr);
058        final Injector forkedInjector = injector.forkInjector(groupConfig);
059
060        final CommunicationGroupServiceClient commGroupClient =
061            forkedInjector.getInstance(CommunicationGroupServiceClient.class);
062
063        this.communicationGroups.put(commGroupClient.getName(), commGroupClient);
064
065      } catch (final InjectionException | IOException e) {
066        throw new RuntimeException("Unable to deserialize operator config", e);
067      }
068    }
069  }
070
071  @Override
072  public CommunicationGroupClient getCommunicationGroup(
073      final Class<? extends Name<String>> groupName) {
074    return communicationGroups.get(groupName);
075  }
076}