001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.util.cache; 020 021import org.apache.reef.util.Optional; 022 023import javax.inject.Inject; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.Map.Entry; 030 031/** 032 * Implementation that supports expire-after-write. 033 * Entries that have expired are collected and invalidated on get calls. 034 * This obviates the need for a separate thread to invalidate expired entries, at 035 * the cost of some increase in get call latency. 036 * The invalidation sweep is only initiated after an interval (expireCheckInterval) 037 * has passed, and at most one invalidation sweep is run at a time. 038 * 039 * Operations on a single key are linearizable. The argument is: 040 * 1. The putIfAbsent call in get guarantees that loadAndGet is called exactly once 041 * for a WrappedValue instance that is put into the map: All putIfAbsent calls 042 * that return the WrappedValue instance will return the value loaded by loadAndGet. 043 * 2. Concurrent putIfAbsent and remove calls on a key have an ordering: if putIfAbsent 044 * returns null then it happened after the remove (and a new value will be loaded); 045 * else if it returns non-null then it happened before the remove 046 * (and the previous value will be returned). 047 */ 048public final class CacheImpl<K, V> implements Cache<K, V> { 049 private final ConcurrentMap<K, WrappedValue<V>> internalMap; 050 private final CurrentTime currentTime; 051 private final long timeoutMillis; 052 private final long expireCheckInterval; 053 private final AtomicBoolean expireInProgress; 054 055 private long expireCheckedTime; 056 057 /** 058 * Construct an expire-after-write cache. 059 * 060 * @param currentTime class that returns the current time for timeout purposes 061 * @param timeoutMillis a cache entry timeout after write 062 */ 063 @Inject 064 public CacheImpl(final CurrentTime currentTime, 065 final long timeoutMillis) { 066 this.internalMap = new ConcurrentHashMap<>(); 067 this.currentTime = currentTime; 068 this.timeoutMillis = timeoutMillis; 069 this.expireCheckInterval = timeoutMillis / 2; 070 this.expireInProgress = new AtomicBoolean(false); 071 072 this.expireCheckedTime = currentTime.now(); 073 } 074 075 @Override 076 public V get(final K key, final Callable<V> valueFetcher) throws ExecutionException { 077 // Before get, try to invalidate as many expired as possible 078 expireEntries(); 079 080 final WrappedValue<V> newWrappedValue = new WrappedValue<>(valueFetcher, currentTime); 081 final WrappedValue<V> existingWrappedValue = internalMap.putIfAbsent(key, newWrappedValue); 082 083 if (existingWrappedValue == null) { 084 // If absent, compute and return 085 return newWrappedValue.loadAndGet(); 086 } else { 087 final Optional<V> existingValue = existingWrappedValue.getValue(); 088 if (existingValue.isPresent()) { 089 // If value already exists, get (without locking) and return 090 return existingValue.get(); 091 } else { 092 // If value is being computed, wait for computation to complete 093 return existingWrappedValue.waitAndGet(); 094 } 095 } 096 } 097 098 private void expireEntries() { 099 if (expireInProgress.compareAndSet(false, true)) { 100 final long now = currentTime.now(); 101 if (expireCheckedTime + expireCheckInterval < now) { 102 expireEntriesAtTime(now); 103 expireCheckedTime = now; 104 } 105 expireInProgress.compareAndSet(true, false); 106 } 107 } 108 109 private void expireEntriesAtTime(final long now) { 110 for (final Entry<K, WrappedValue<V>> entry : internalMap.entrySet()) { 111 if (entry.getValue() != null) { 112 final Optional<Long> writeTime = entry.getValue().getWriteTime(); 113 if (writeTime.isPresent() && writeTime.get() + timeoutMillis < now) { 114 invalidate(entry.getKey()); 115 } 116 } 117 } 118 } 119 120 @Override 121 public void invalidate(final K key) { 122 internalMap.remove(key); 123 } 124}