001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.utils; 020 021import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; 022 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import java.util.logging.Level; 026import java.util.logging.Logger; 027 028/** 029 * Utility map class that wraps a CountingMap. 030 * in a ConcurrentMap 031 * Equivalent to {@code Map<K,Map<V,Integer>>} 032 */ 033public class ConcurrentCountingMap<K, V> { 034 private static final Logger LOG = Logger.getLogger(ConcurrentCountingMap.class.getName()); 035 private final ConcurrentMap<K, CountingMap<V>> map = new ConcurrentHashMap<>(); 036 037 public boolean remove(final K key, final V value) { 038 if (!map.containsKey(key)) { 039 return false; 040 } 041 final boolean retVal = map.get(key).remove(value); 042 if (map.get(key).isEmpty()) { 043 map.remove(key); 044 } 045 return retVal; 046 } 047 048 public void add(final K key, final V value) { 049 map.putIfAbsent(key, new CountingMap<V>()); 050 map.get(key).add(value); 051 } 052 053 public CountingMap<V> get(final K key) { 054 return map.get(key); 055 } 056 057 public boolean isEmpty() { 058 return map.isEmpty(); 059 } 060 061 public boolean containsKey(final K key) { 062 return map.containsKey(key); 063 } 064 065 public boolean contains(final K key, final V value) { 066 if (!map.containsKey(key)) { 067 return false; 068 } 069 return map.get(key).containsKey(value); 070 } 071 072 public boolean notContains(final V value) { 073 for (final CountingMap<V> innerMap : map.values()) { 074 if (innerMap.containsKey(value)) { 075 return false; 076 } 077 } 078 return true; 079 } 080 081 @Override 082 public String toString() { 083 return map.toString(); 084 } 085 086 public void clear() { 087 for (final CountingMap<V> value : map.values()) { 088 value.clear(); 089 } 090 map.clear(); 091 } 092 093 public static void main(final String[] args) { 094 final ConcurrentCountingMap<ReefNetworkGroupCommProtos.GroupCommMessage.Type, String> strMap = 095 new ConcurrentCountingMap<>(); 096 LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty()); 097 strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"); 098 LOG.log(Level.INFO, "OUT: {0}", strMap); 099 strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1"); 100 LOG.log(Level.INFO, "OUT: {0}", strMap); 101 strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, "ST0"); 102 LOG.log(Level.INFO, "OUT: {0}", strMap); 103 strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, "ST1"); 104 LOG.log(Level.INFO, "OUT: {0}", strMap); 105 strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST2"); 106 LOG.log(Level.INFO, "OUT: {0}", strMap); 107 strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST3"); 108 LOG.log(Level.INFO, "OUT: {0}", strMap); 109 strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"); 110 LOG.log(Level.INFO, "OUT: {0}", strMap); 111 strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1"); 112 LOG.log(Level.INFO, "OUT: {0}", strMap); 113 strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST2"); 114 LOG.log(Level.INFO, "OUT: {0}", strMap); 115 strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST3"); 116 LOG.log(Level.INFO, "OUT: {0}", strMap); 117 strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"); 118 LOG.log(Level.INFO, "OUT: {0}", strMap); 119 LOG.log(Level.INFO, "OUT: {0}", strMap.get(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd)); 120 LOG.log(Level.INFO, "OUT: {0}", strMap.get(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead)); 121 strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1"); 122 LOG.log(Level.INFO, "OUT: {0}", strMap); 123 strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1"); 124 LOG.log(Level.INFO, "OUT: {0}", strMap); 125 LOG.log(Level.INFO, "OUT: {0}", strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd)); 126 LOG.log(Level.INFO, "OUT: {0}", strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead)); 127 LOG.log(Level.INFO, "OUT: {0}", strMap.contains(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0")); 128 LOG.log(Level.INFO, "OUT: {0}", strMap.contains(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST2")); 129 strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"); 130 LOG.log(Level.INFO, "OUT: {0}", strMap); 131 LOG.log(Level.INFO, "OUT: {0}", strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd)); 132 LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty()); 133 } 134}