001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.utils;
020
021import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
022
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import java.util.logging.Level;
026import java.util.logging.Logger;
027
028/**
029 * Utility map class that wraps a CountingMap.
030 * in a ConcurrentMap
031 * Equivalent to {@code Map<K,Map<V,Integer>>}
032 */
033public class ConcurrentCountingMap<K, V> {
034  private static final Logger LOG = Logger.getLogger(ConcurrentCountingMap.class.getName());
035  private final ConcurrentMap<K, CountingMap<V>> map = new ConcurrentHashMap<>();
036
037  public boolean remove(final K key, final V value) {
038    if (!map.containsKey(key)) {
039      return false;
040    }
041    final boolean retVal = map.get(key).remove(value);
042    if (map.get(key).isEmpty()) {
043      map.remove(key);
044    }
045    return retVal;
046  }
047
048  public void add(final K key, final V value) {
049    map.putIfAbsent(key, new CountingMap<V>());
050    map.get(key).add(value);
051  }
052
053  public CountingMap<V> get(final K key) {
054    return map.get(key);
055  }
056
057  public boolean isEmpty() {
058    return map.isEmpty();
059  }
060
061  public boolean containsKey(final K key) {
062    return map.containsKey(key);
063  }
064
065  public boolean contains(final K key, final V value) {
066    if (!map.containsKey(key)) {
067      return false;
068    }
069    return map.get(key).containsKey(value);
070  }
071
072  public boolean notContains(final V value) {
073    for (final CountingMap<V> innerMap : map.values()) {
074      if (innerMap.containsKey(value)) {
075        return false;
076      }
077    }
078    return true;
079  }
080
081  @Override
082  public String toString() {
083    return map.toString();
084  }
085
086  public void clear() {
087    for (final CountingMap<V> value : map.values()) {
088      value.clear();
089    }
090    map.clear();
091  }
092
093  public static void main(final String[] args) {
094    final ConcurrentCountingMap<ReefNetworkGroupCommProtos.GroupCommMessage.Type, String> strMap =
095        new ConcurrentCountingMap<>();
096    LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty());
097    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0");
098    LOG.log(Level.INFO, "OUT: {0}", strMap);
099    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1");
100    LOG.log(Level.INFO, "OUT: {0}", strMap);
101    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, "ST0");
102    LOG.log(Level.INFO, "OUT: {0}", strMap);
103    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, "ST1");
104    LOG.log(Level.INFO, "OUT: {0}", strMap);
105    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST2");
106    LOG.log(Level.INFO, "OUT: {0}", strMap);
107    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST3");
108    LOG.log(Level.INFO, "OUT: {0}", strMap);
109    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0");
110    LOG.log(Level.INFO, "OUT: {0}", strMap);
111    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1");
112    LOG.log(Level.INFO, "OUT: {0}", strMap);
113    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST2");
114    LOG.log(Level.INFO, "OUT: {0}", strMap);
115    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST3");
116    LOG.log(Level.INFO, "OUT: {0}", strMap);
117    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0");
118    LOG.log(Level.INFO, "OUT: {0}", strMap);
119    LOG.log(Level.INFO, "OUT: {0}", strMap.get(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd));
120    LOG.log(Level.INFO, "OUT: {0}", strMap.get(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead));
121    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1");
122    LOG.log(Level.INFO, "OUT: {0}", strMap);
123    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1");
124    LOG.log(Level.INFO, "OUT: {0}", strMap);
125    LOG.log(Level.INFO, "OUT: {0}", strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd));
126    LOG.log(Level.INFO, "OUT: {0}", strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead));
127    LOG.log(Level.INFO, "OUT: {0}", strMap.contains(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"));
128    LOG.log(Level.INFO, "OUT: {0}", strMap.contains(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST2"));
129    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0");
130    LOG.log(Level.INFO, "OUT: {0}", strMap);
131    LOG.log(Level.INFO, "OUT: {0}", strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd));
132    LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty());
133  }
134}