001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.util.cache;
020
021import org.apache.reef.util.Optional;
022
023import javax.inject.Inject;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.Map.Entry;
030
031/**
032 * Implementation that supports expire-after-write.
033 * Entries that have expired are collected and invalidated on get calls.
034 * This obviates the need for a separate thread to invalidate expired entries, at
035 * the cost of some increase in get call latency.
036 * The invalidation sweep is only initiated after an interval (expireCheckInterval)
037 * has passed, and at most one invalidation sweep is run at a time.
038 *
039 * Operations on a single key are linearizable. The argument is:
040 * 1. The putIfAbsent call in get guarantees that loadAndGet is called exactly once
041 *    for a WrappedValue instance that is put into the map: All putIfAbsent calls
042 *    that return the WrappedValue instance will return the value loaded by loadAndGet.
043 * 2. Concurrent putIfAbsent and remove calls on a key have an ordering: if putIfAbsent
044 *    returns null then it happened after the remove (and a new value will be loaded);
045 *    else if it returns non-null then it happened before the remove
046 *    (and the previous value will be returned).
047 */
048public final class CacheImpl<K, V> implements Cache<K, V> {
049  private final ConcurrentMap<K, WrappedValue<V>> internalMap;
050  private final CurrentTime currentTime;
051  private final long timeoutMillis;
052  private final long expireCheckInterval;
053  private final AtomicBoolean expireInProgress;
054
055  private long expireCheckedTime;
056
057  /**
058   * Construct an expire-after-write cache.
059   *
060   * @param currentTime   class that returns the current time for timeout purposes
061   * @param timeoutMillis a cache entry timeout after write
062   */
063  @Inject
064  public CacheImpl(final CurrentTime currentTime,
065                   final long timeoutMillis) {
066    this.internalMap = new ConcurrentHashMap<>();
067    this.currentTime = currentTime;
068    this.timeoutMillis = timeoutMillis;
069    this.expireCheckInterval = timeoutMillis / 2;
070    this.expireInProgress = new AtomicBoolean(false);
071
072    this.expireCheckedTime = currentTime.now();
073  }
074
075  @Override
076  public V get(final K key, final Callable<V> valueFetcher) throws ExecutionException {
077    // Before get, try to invalidate as many expired as possible
078    expireEntries();
079
080    final WrappedValue<V> newWrappedValue = new WrappedValue<>(valueFetcher, currentTime);
081    final WrappedValue<V> existingWrappedValue = internalMap.putIfAbsent(key, newWrappedValue);
082
083    if (existingWrappedValue == null) {
084      // If absent, compute and return
085      return newWrappedValue.loadAndGet();
086    } else {
087      final Optional<V> existingValue = existingWrappedValue.getValue();
088      if (existingValue.isPresent()) {
089        // If value already exists, get (without locking) and return
090        return existingValue.get();
091      } else {
092        // If value is being computed, wait for computation to complete
093        return existingWrappedValue.waitAndGet();
094      }
095    }
096  }
097
098  private void expireEntries() {
099    if (expireInProgress.compareAndSet(false, true)) {
100      final long now = currentTime.now();
101      if (expireCheckedTime + expireCheckInterval < now) {
102        expireEntriesAtTime(now);
103        expireCheckedTime = now;
104      }
105      expireInProgress.compareAndSet(true, false);
106    }
107  }
108
109  private void expireEntriesAtTime(final long now) {
110    for (final Entry<K, WrappedValue<V>> entry : internalMap.entrySet()) {
111      if (entry.getValue() != null) {
112        final Optional<Long> writeTime = entry.getValue().getWriteTime();
113        if (writeTime.isPresent() && writeTime.get() + timeoutMillis < now) {
114          invalidate(entry.getKey());
115        }
116      }
117    }
118  }
119
120  @Override
121  public void invalidate(final K key) {
122    internalMap.remove(key);
123  }
124}