This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.task;
020
021import org.apache.reef.io.network.Message;
022import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler;
023import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
024import org.apache.reef.tang.annotations.Name;
025import org.apache.reef.wake.EventHandler;
026
027import javax.inject.Inject;
028import java.util.Arrays;
029import java.util.Iterator;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.logging.Logger;
033
034public class GroupCommNetworkHandlerImpl implements GroupCommNetworkHandler {
035
036  private static final Logger LOG = Logger.getLogger(GroupCommNetworkHandlerImpl.class.getName());
037
038  private final Map<Class<? extends Name<String>>, EventHandler<GroupCommunicationMessage>> commGroupHandlers =
039      new ConcurrentHashMap<>();
040
041  @Inject
042  public GroupCommNetworkHandlerImpl() {
043  }
044
045  @Override
046  public void onNext(final Message<GroupCommunicationMessage> mesg) {
047    LOG.entering("GroupCommNetworkHandlerImpl", "onNext", mesg);
048    final Iterator<GroupCommunicationMessage> iter = mesg.getData().iterator();
049    final GroupCommunicationMessage msg = iter.hasNext() ? iter.next() : null;
050    if (msg != null) {
051      try {
052        final Class<? extends Name<String>> groupName =
053            (Class<? extends Name<String>>) Class.forName(msg.getGroupname());
054        commGroupHandlers.get(groupName).onNext(msg);
055      } catch (final ClassNotFoundException e) {
056        throw new RuntimeException("GroupName not found", e);
057      }
058    }
059    LOG.exiting("GroupCommNetworkHandlerImpl", "onNext", mesg);
060  }
061
062  @Override
063  public void register(final Class<? extends Name<String>> groupName,
064                       final EventHandler<GroupCommunicationMessage> commGroupNetworkHandler) {
065    LOG.entering("GroupCommNetworkHandlerImpl", "register", new Object[]{groupName,
066        commGroupNetworkHandler});
067    commGroupHandlers.put(groupName, commGroupNetworkHandler);
068    LOG.exiting("GroupCommNetworkHandlerImpl", "register", Arrays.toString(new Object[]{groupName,
069        commGroupNetworkHandler}));
070  }
071
072}