HBASE-23887 AdaptiveLRU cache (#2934)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
diff --git a/dev-support/spotbugs-exclude.xml b/dev-support/spotbugs-exclude.xml
index f57faaf..1137f5f 100644
--- a/dev-support/spotbugs-exclude.xml
+++ b/dev-support/spotbugs-exclude.xml
@@ -247,4 +247,9 @@
     <Bug pattern="SC_START_IN_CTOR"/>
   </Match>
 
+  <Match>
+    <Class name="org.apache.hadoop.hbase.io.hfile.LruAdaptiveBlockCache"/>
+    <Bug pattern="SC_START_IN_CTOR"/>
+  </Match>
+
 </FindBugsFilter>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
index 2b97320..234c921 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
@@ -145,6 +145,8 @@
       return new LruBlockCache(cacheSize, blockSize, true, c);
     } else if (policy.equalsIgnoreCase("TinyLFU")) {
       return new TinyLfuBlockCache(cacheSize, blockSize, ForkJoinPool.commonPool(), c);
+    } else if (policy.equalsIgnoreCase("AdaptiveLRU")) {
+      return new LruAdaptiveBlockCache(cacheSize, blockSize, true, c);
     } else {
       throw new IllegalArgumentException("Unknown policy: " + policy);
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java
new file mode 100644
index 0000000..083f109
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java
@@ -0,0 +1,1421 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static java.util.Objects.requireNonNull;
+
+import java.lang.ref.WeakReference;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * <b>This realisation improve performance of classical LRU
+ * cache up to 3 times via reduce GC job.</b>
+ * </p>
+ * The classical block cache implementation that is memory-aware using {@link HeapSize},
+ * memory-bound using an
+ * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a
+ * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock}
+ * operations.
+ * </p>
+ * Contains three levels of block priority to allow for scan-resistance and in-memory families
+ * {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder#setInMemory(boolean)} (An
+ * in-memory column family is a column family that should be served from memory if possible):
+ * single-access, multiple-accesses, and in-memory priority. A block is added with an in-memory
+ * priority flag if {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptor#isInMemory()},
+ * otherwise a block becomes a single access priority the first time it is read into this block
+ * cache. If a block is accessed again while in cache, it is marked as a multiple access priority
+ * block. This delineation of blocks is used to prevent scans from thrashing the cache adding a
+ * least-frequently-used element to the eviction algorithm.
+ * <p/>
+ * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each
+ * priority will retain close to its maximum size, however, if any priority is not using its entire
+ * chunk the others are able to grow beyond their chunk size.
+ * <p/>
+ * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The
+ * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It
+ * is only used for pre-allocating data structures and in initial heap estimation of the map.
+ * <p/>
+ * The detailed constructor defines the sizes for the three priorities (they should total to the
+ * <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction
+ * thread.
+ * <p/>
+ * The <code>acceptable size</code> is the cache size level which triggers the eviction process to
+ * start. It evicts enough blocks to get the size below the minimum size specified.
+ * <p/>
+ * Eviction happens in a separate thread and involves a single full-scan of the map. It determines
+ * how many bytes must be freed to reach the minimum size, and then while scanning determines the
+ * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times
+ * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative
+ * sizes and usage.
+ * <p/>
+ * Adaptive LRU cache lets speed up performance while we are reading much more data than can fit
+ * into BlockCache and it is the cause of a high rate of evictions. This in turn leads to heavy
+ * Garbage Collector works. So a lot of blocks put into BlockCache but never read, but spending
+ * a lot of CPU resources for cleaning. We could avoid this situation via parameters:
+ * <p/>
+ * <b>hbase.lru.cache.heavy.eviction.count.limit</b>  - set how many times we have to run the
+ * eviction process that starts to avoid putting data to BlockCache. By default it is 0 and it
+ * meats the feature will start at the beginning. But if we have some times short reading the same
+ * data and some times long-term reading - we can divide it by this parameter. For example we know
+ * that our short reading used to be about 1 minutes, then we have to set the parameter about 10
+ * and it will enable the feature only for long time massive reading (after ~100 seconds). So when
+ * we use short-reading and want all of them in the cache we will have it (except for eviction of
+ * course). When we use long-term heavy reading the feature will be enabled after some time and
+ * bring better performance.
+ * <p/>
+ * <b>hbase.lru.cache.heavy.eviction.mb.size.limit</b> - set how many bytes in 10 seconds desirable
+ * putting into BlockCache (and evicted from it). The feature will try to reach this value and
+ * maintain it. Don't try to set it too small because it leads to premature exit from this mode.
+ * For powerful CPUs (about 20-40 physical cores)  it could be about 400-500 MB. Average system
+ * (~10 cores) 200-300 MB. Some weak systems (2-5 cores) may be good with 50-100 MB.
+ * How it works: we set the limit and after each ~10 second calculate how many bytes were freed.
+ * Overhead = Freed Bytes Sum (MB) * 100 / Limit (MB) - 100;
+ * For example we set the limit = 500 and were evicted 2000 MB. Overhead is:
+ * 2000 * 100 / 500 - 100 = 300%
+ * The feature is going to reduce a percent caching data blocks and fit evicted bytes closer to
+ * 100% (500 MB). Some kind of an auto-scaling.
+ * If freed bytes less then the limit we have got negative overhead.
+ * For example if were freed 200 MB:
+ * 200 * 100 / 500 - 100 = -60%
+ * The feature will increase the percent of caching blocks.
+ * That leads to fit evicted bytes closer to 100% (500 MB).
+ * The current situation we can find out in the log of RegionServer:
+ * BlockCache evicted (MB): 0, overhead (%): -100, heavy eviction counter: 0, current caching
+ * DataBlock (%): 100 - means no eviction, 100% blocks is caching
+ * BlockCache evicted (MB): 2000, overhead (%): 300, heavy eviction counter: 1, current caching
+ * DataBlock (%): 97 - means eviction begin, reduce of caching blocks by 3%.
+ * It help to tune your system and find out what value is better set. Don't try to reach 0%
+ * overhead, it is impossible. Quite good 50-100% overhead,
+ * it prevents premature exit from this mode.
+ * <p/>
+ * <b>hbase.lru.cache.heavy.eviction.overhead.coefficient</b> - set how fast we want to get the
+ * result. If we know that our reading is heavy for a long time, we don't want to wait and can
+ * increase the coefficient and get good performance sooner. But if we aren't sure we can do it
+ * slowly and it could prevent premature exit from this mode. So, when the coefficient is higher
+ * we can get better performance when heavy reading is stable. But when reading is changing we
+ * can adjust to it and set  the coefficient to lower value.
+ * For example, we set the coefficient = 0.01. It means the overhead (see above) will be
+ * multiplied by 0.01 and the result is the value of reducing percent caching blocks. For example,
+ * if the overhead = 300% and the coefficient = 0.01,
+ * then percent of caching blocks will reduce by 3%.
+ * Similar logic when overhead has got negative value (overshooting).  Maybe it is just short-term
+ * fluctuation and we will try to stay in this mode. It helps avoid premature exit during
+ * short-term fluctuation. Backpressure has simple logic: more overshooting - more caching blocks.
+ * <p/>
+ * Find more information about improvement: https://issues.apache.org/jira/browse/HBASE-23887
+ */
+@InterfaceAudience.Private
+public class LruAdaptiveBlockCache implements FirstLevelBlockCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LruAdaptiveBlockCache.class);
+
+  /**
+   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
+   * evicting during an eviction run till the cache size is down to 80% of the total.
+   */
+  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
+
+  /**
+   * Acceptable size of cache (no evictions if size < acceptable)
+   */
+  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
+    "hbase.lru.blockcache.acceptable.factor";
+
+  /**
+   * Hard capacity limit of cache, will reject any put if size > this * acceptable
+   */
+  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
+    "hbase.lru.blockcache.hard.capacity.limit.factor";
+  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
+    "hbase.lru.blockcache.single.percentage";
+  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
+    "hbase.lru.blockcache.multi.percentage";
+  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
+    "hbase.lru.blockcache.memory.percentage";
+
+  /**
+   * Configuration key to force data-block always (except in-memory are too much)
+   * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
+   * configuration, inMemoryForceMode is a cluster-wide configuration
+   */
+  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
+    "hbase.lru.rs.inmemoryforcemode";
+
+  /* Default Configuration Parameters*/
+
+  /* Backing Concurrent Map Configuration */
+  static final float DEFAULT_LOAD_FACTOR = 0.75f;
+  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
+
+  /* Eviction thresholds */
+  private static final float DEFAULT_MIN_FACTOR = 0.95f;
+  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
+
+  /* Priority buckets */
+  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
+  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
+  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
+
+  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
+
+  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
+
+  /* Statistics thread */
+  private static final int STAT_THREAD_PERIOD = 60 * 5;
+  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
+  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
+
+  private static final String LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT
+    = "hbase.lru.cache.heavy.eviction.count.limit";
+  // Default value actually equal to disable feature of increasing performance.
+  // Because 2147483647 is about ~680 years (after that it will start to work)
+  // We can set it to 0-10 and get the profit right now.
+  // (see details https://issues.apache.org/jira/browse/HBASE-23887).
+  private static final int DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT = Integer.MAX_VALUE;
+
+  private static final String LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT
+    = "hbase.lru.cache.heavy.eviction.mb.size.limit";
+  private static final long DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT = 500;
+
+  private static final String LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT
+    = "hbase.lru.cache.heavy.eviction.overhead.coefficient";
+  private static final float DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT = 0.01f;
+
+  /**
+   * Defined the cache map as {@link ConcurrentHashMap} here, because in
+   * {@link LruAdaptiveBlockCache#getBlock}, we need to guarantee the atomicity
+   * of map#computeIfPresent (key, func). Besides, the func method must execute exactly once only
+   * when the key is present and under the lock context, otherwise the reference count will be
+   * messed up. Notice that the
+   * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
+   */
+  private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
+
+  /** Eviction lock (locked when eviction in process) */
+  private transient final ReentrantLock evictionLock = new ReentrantLock(true);
+
+  private final long maxBlockSize;
+
+  /** Volatile boolean to track if we are in an eviction process or not */
+  private volatile boolean evictionInProgress = false;
+
+  /** Eviction thread */
+  private transient final EvictionThread evictionThread;
+
+  /** Statistics thread schedule pool (for heavy debugging, could remove) */
+  private transient final ScheduledExecutorService scheduleThreadPool =
+    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+      .setNameFormat("LruAdaptiveBlockCacheStatsExecutor").setDaemon(true).build());
+
+  /** Current size of cache */
+  private final AtomicLong size;
+
+  /** Current size of data blocks */
+  private final LongAdder dataBlockSize;
+
+  /** Current number of cached elements */
+  private final AtomicLong elements;
+
+  /** Current number of cached data block elements */
+  private final LongAdder dataBlockElements;
+
+  /** Cache access count (sequential ID) */
+  private final AtomicLong count;
+
+  /** hard capacity limit */
+  private final float hardCapacityLimitFactor;
+
+  /** Cache statistics */
+  private final CacheStats stats;
+
+  /** Maximum allowable size of cache (block put if size > max, evict) */
+  private long maxSize;
+
+  /** Approximate block size */
+  private final long blockSize;
+
+  /** Acceptable size of cache (no evictions if size < acceptable) */
+  private final float acceptableFactor;
+
+  /** Minimum threshold of cache (when evicting, evict until size < min) */
+  private final float minFactor;
+
+  /** Single access bucket size */
+  private final float singleFactor;
+
+  /** Multiple access bucket size */
+  private final float multiFactor;
+
+  /** In-memory bucket size */
+  private final float memoryFactor;
+
+  /** Overhead of the structure itself */
+  private final long overhead;
+
+  /** Whether in-memory hfile's data block has higher priority when evicting */
+  private boolean forceInMemory;
+
+  /**
+   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
+   * external cache as L2.
+   * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
+   */
+  private transient BlockCache victimHandler = null;
+
+  /** Percent of cached data blocks */
+  private volatile int cacheDataBlockPercent;
+
+  /** Limit of count eviction process when start to avoid to cache blocks */
+  private final int heavyEvictionCountLimit;
+
+  /** Limit of volume eviction process when start to avoid to cache blocks */
+  private final long heavyEvictionMbSizeLimit;
+
+  /** Adjust auto-scaling via overhead of evition rate */
+  private final float heavyEvictionOverheadCoefficient;
+
+  /**
+   * Default constructor.  Specify maximum size and expected average block
+   * size (approximation is fine).
+   *
+   * <p>All other factors will be calculated based on defaults specified in
+   * this class.
+   *
+   * @param maxSize   maximum size of cache, in bytes
+   * @param blockSize approximate size of each block, in bytes
+   */
+  public LruAdaptiveBlockCache(long maxSize, long blockSize) {
+    this(maxSize, blockSize, true);
+  }
+
+  /**
+   * Constructor used for testing.  Allows disabling of the eviction thread.
+   */
+  public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread) {
+    this(maxSize, blockSize, evictionThread,
+      (int) Math.ceil(1.2 * maxSize / blockSize),
+      DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
+      DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
+      DEFAULT_SINGLE_FACTOR,
+      DEFAULT_MULTI_FACTOR,
+      DEFAULT_MEMORY_FACTOR,
+      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
+      false,
+      DEFAULT_MAX_BLOCK_SIZE,
+      DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT,
+      DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
+      DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT);
+  }
+
+  public LruAdaptiveBlockCache(long maxSize, long blockSize,
+    boolean evictionThread, Configuration conf) {
+    this(maxSize, blockSize, evictionThread,
+      (int) Math.ceil(1.2 * maxSize / blockSize),
+      DEFAULT_LOAD_FACTOR,
+      DEFAULT_CONCURRENCY_LEVEL,
+      conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
+      conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
+      conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
+      conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
+      conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
+      conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
+        DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
+      conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
+      conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE),
+      conf.getInt(LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT,
+        DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT),
+      conf.getLong(LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
+        DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT),
+      conf.getFloat(LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT,
+        DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT));
+  }
+
+  public LruAdaptiveBlockCache(long maxSize, long blockSize, Configuration conf) {
+    this(maxSize, blockSize, true, conf);
+  }
+
+  /**
+   * Configurable constructor.  Use this constructor if not using defaults.
+   *
+   * @param maxSize             maximum size of this cache, in bytes
+   * @param blockSize           expected average size of blocks, in bytes
+   * @param evictionThread      whether to run evictions in a bg thread or not
+   * @param mapInitialSize      initial size of backing ConcurrentHashMap
+   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
+   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
+   * @param minFactor           percentage of total size that eviction will evict until
+   * @param acceptableFactor    percentage of total size that triggers eviction
+   * @param singleFactor        percentage of total size for single-access blocks
+   * @param multiFactor         percentage of total size for multiple-access blocks
+   * @param memoryFactor        percentage of total size for in-memory blocks
+   */
+  public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread,
+    int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
+    float minFactor, float acceptableFactor, float singleFactor,
+    float multiFactor, float memoryFactor, float hardLimitFactor,
+    boolean forceInMemory, long maxBlockSize,
+    int heavyEvictionCountLimit, long heavyEvictionMbSizeLimit,
+    float heavyEvictionOverheadCoefficient) {
+    this.maxBlockSize = maxBlockSize;
+    if(singleFactor + multiFactor + memoryFactor != 1 ||
+      singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
+      throw new IllegalArgumentException("Single, multi, and memory factors " +
+        " should be non-negative and total 1.0");
+    }
+    if (minFactor >= acceptableFactor) {
+      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
+    }
+    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
+      throw new IllegalArgumentException("all factors must be < 1");
+    }
+    this.maxSize = maxSize;
+    this.blockSize = blockSize;
+    this.forceInMemory = forceInMemory;
+    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
+    this.minFactor = minFactor;
+    this.acceptableFactor = acceptableFactor;
+    this.singleFactor = singleFactor;
+    this.multiFactor = multiFactor;
+    this.memoryFactor = memoryFactor;
+    this.stats = new CacheStats(this.getClass().getSimpleName());
+    this.count = new AtomicLong(0);
+    this.elements = new AtomicLong(0);
+    this.dataBlockElements = new LongAdder();
+    this.dataBlockSize = new LongAdder();
+    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
+    this.size = new AtomicLong(this.overhead);
+    this.hardCapacityLimitFactor = hardLimitFactor;
+    if (evictionThread) {
+      this.evictionThread = new EvictionThread(this);
+      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
+    } else {
+      this.evictionThread = null;
+    }
+
+    // check the bounds
+    this.heavyEvictionCountLimit = Math.max(heavyEvictionCountLimit, 0);
+    this.heavyEvictionMbSizeLimit = Math.max(heavyEvictionCountLimit, 1);
+    this.cacheDataBlockPercent = 100;
+    heavyEvictionOverheadCoefficient = Math.min(heavyEvictionOverheadCoefficient, 1.0f);
+    heavyEvictionOverheadCoefficient = Math.max(heavyEvictionOverheadCoefficient, 0.001f);
+    this.heavyEvictionOverheadCoefficient = heavyEvictionOverheadCoefficient;
+
+    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
+    // every five minutes.
+    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
+      STAT_THREAD_PERIOD, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setVictimCache(BlockCache victimCache) {
+    if (victimHandler != null) {
+      throw new IllegalArgumentException("The victim cache has already been set");
+    }
+    victimHandler = requireNonNull(victimCache);
+  }
+
+  @Override
+  public void setMaxSize(long maxSize) {
+    this.maxSize = maxSize;
+    if (this.size.get() > acceptableSize() && !evictionInProgress) {
+      runEviction();
+    }
+  }
+
+  public int getCacheDataBlockPercent() {
+    return cacheDataBlockPercent;
+  }
+
+  /**
+   * The block cached in LruAdaptiveBlockCache will always be an heap block: on the one side,
+   * the heap access will be more faster then off-heap, the small index block or meta block
+   * cached in CombinedBlockCache will benefit a lot. on other side, the LruAdaptiveBlockCache
+   * size is always calculated based on the total heap size, if caching an off-heap block in
+   * LruAdaptiveBlockCache, the heap size will be messed up. Here we will clone the block into an
+   * heap block if it's an off-heap block, otherwise just use the original block. The key point is
+   * maintain the refCnt of the block (HBASE-22127): <br>
+   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
+   * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
+   * reservoir, if both RPC and LruAdaptiveBlockCache release the block, then it can be garbage
+   * collected by JVM, so need a retain here.
+   * @param buf the original block
+   * @return an block with an heap memory backend.
+   */
+  private Cacheable asReferencedHeapBlock(Cacheable buf) {
+    if (buf instanceof HFileBlock) {
+      HFileBlock blk = ((HFileBlock) buf);
+      if (blk.isSharedMem()) {
+        return HFileBlock.deepCloneOnHeap(blk);
+      }
+    }
+    // The block will be referenced by this LruAdaptiveBlockCache,
+    // so should increase its refCnt here.
+    return buf.retain();
+  }
+
+  // BlockCache implementation
+
+  /**
+   * Cache the block with the specified name and buffer.
+   * <p>
+   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
+   * this can happen, for which we compare the buffer contents.
+   *
+   * @param cacheKey block's cache key
+   * @param buf      block buffer
+   * @param inMemory if block is in-memory
+   */
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
+
+    // Some data blocks will not put into BlockCache when eviction rate too much.
+    // It is good for performance
+    // (see details: https://issues.apache.org/jira/browse/HBASE-23887)
+    // How to calculate it can find inside EvictionThread class.
+    if (cacheDataBlockPercent != 100 && buf.getBlockType().isData()) {
+      // It works like filter - blocks which two last digits of offset
+      // more than we calculate in Eviction Thread will not put into BlockCache
+      if (cacheKey.getOffset() % 100 >= cacheDataBlockPercent) {
+        return;
+      }
+    }
+
+    if (buf.heapSize() > maxBlockSize) {
+      // If there are a lot of blocks that are too
+      // big this can make the logs way too noisy.
+      // So we log 2%
+      if (stats.failInsert() % 50 == 0) {
+        LOG.warn("Trying to cache too large a block "
+          + cacheKey.getHfileName() + " @ "
+          + cacheKey.getOffset()
+          + " is " + buf.heapSize()
+          + " which is larger than " + maxBlockSize);
+      }
+      return;
+    }
+
+    LruCachedBlock cb = map.get(cacheKey);
+    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this,
+      cacheKey, buf)) {
+      return;
+    }
+    long currentSize = size.get();
+    long currentAcceptableSize = acceptableSize();
+    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
+    if (currentSize >= hardLimitSize) {
+      stats.failInsert();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("LruAdaptiveBlockCache current size " + StringUtils.byteDesc(currentSize)
+          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
+          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
+          + ", failed to put cacheKey:" + cacheKey + " into LruAdaptiveBlockCache.");
+      }
+      if (!evictionInProgress) {
+        runEviction();
+      }
+      return;
+    }
+    // Ensure that the block is an heap one.
+    buf = asReferencedHeapBlock(buf);
+    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
+    long newSize = updateSizeMetrics(cb, false);
+    map.put(cacheKey, cb);
+    long val = elements.incrementAndGet();
+    if (buf.getBlockType().isData()) {
+      dataBlockElements.increment();
+    }
+    if (LOG.isTraceEnabled()) {
+      long size = map.size();
+      assertCounterSanity(size, val);
+    }
+    if (newSize > currentAcceptableSize && !evictionInProgress) {
+      runEviction();
+    }
+  }
+
+  /**
+   * Sanity-checking for parity between actual block cache content and metrics.
+   * Intended only for use with TRACE level logging and -ea JVM.
+   */
+  private static void assertCounterSanity(long mapSize, long counterVal) {
+    if (counterVal < 0) {
+      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
+        ", mapSize=" + mapSize);
+      return;
+    }
+    if (mapSize < Integer.MAX_VALUE) {
+      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
+      if (pct_diff > 0.05) {
+        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
+          ", mapSize=" + mapSize);
+      }
+    }
+  }
+
+  /**
+   * Cache the block with the specified name and buffer.
+   * <p>
+   * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
+   * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
+   * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
+   * otherwise the caching size is based on off-heap.
+   * @param cacheKey block's cache key
+   * @param buf block buffer
+   */
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
+    cacheBlock(cacheKey, buf, false);
+  }
+
+  /**
+   * Helper function that updates the local size counter and also updates any
+   * per-cf or per-blocktype metrics it can discern from given
+   * {@link LruCachedBlock}
+   */
+  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
+    long heapsize = cb.heapSize();
+    BlockType bt = cb.getBuffer().getBlockType();
+    if (evict) {
+      heapsize *= -1;
+    }
+    if (bt != null && bt.isData()) {
+      dataBlockSize.add(heapsize);
+    }
+    return size.addAndGet(heapsize);
+  }
+
+  /**
+   * Get the buffer of the block with the specified name.
+   *
+   * @param cacheKey           block's cache key
+   * @param caching            true if the caller caches blocks on cache misses
+   * @param repeat             Whether this is a repeat lookup for the same block
+   *                           (used to avoid double counting cache misses when doing double-check
+   *                           locking)
+   * @param updateCacheMetrics Whether to update cache metrics or not
+   *
+   * @return buffer of specified cache key, or null if not in cache
+   */
+  @Override
+  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
+    boolean updateCacheMetrics) {
+    LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
+      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
+      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
+      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
+      // see HBASE-22422.
+      val.getBuffer().retain();
+      return val;
+    });
+    if (cb == null) {
+      if (!repeat && updateCacheMetrics) {
+        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+      }
+      // If there is another block cache then try and read there.
+      // However if this is a retry ( second time in double checked locking )
+      // And it's already a miss then the l2 will also be a miss.
+      if (victimHandler != null && !repeat) {
+        // The handler will increase result's refCnt for RPC, so need no extra retain.
+        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
+        // Promote this to L1.
+        if (result != null) {
+          if (caching) {
+            cacheBlock(cacheKey, result, /* inMemory = */ false);
+          }
+        }
+        return result;
+      }
+      return null;
+    }
+    if (updateCacheMetrics) {
+      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+    }
+    cb.access(count.incrementAndGet());
+    return cb.getBuffer();
+  }
+
+  /**
+   * Whether the cache contains block with specified cacheKey
+   *
+   * @return true if contains the block
+   */
+  @Override
+  public boolean containsBlock(BlockCacheKey cacheKey) {
+    return map.containsKey(cacheKey);
+  }
+
+  @Override
+  public boolean evictBlock(BlockCacheKey cacheKey) {
+    LruCachedBlock cb = map.get(cacheKey);
+    return cb != null && evictBlock(cb, false) > 0;
+  }
+
+  /**
+   * Evicts all blocks for a specific HFile. This is an
+   * expensive operation implemented as a linear-time search through all blocks
+   * in the cache. Ideally this should be a search in a log-access-time map.
+   *
+   * <p>
+   * This is used for evict-on-close to remove all blocks of a specific HFile.
+   *
+   * @return the number of blocks evicted
+   */
+  @Override
+  public int evictBlocksByHfileName(String hfileName) {
+    int numEvicted = (int) map.keySet().stream().filter(key -> key.getHfileName().equals(hfileName))
+      .filter(this::evictBlock).count();
+    if (victimHandler != null) {
+      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
+    }
+    return numEvicted;
+  }
+
+  /**
+   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
+   * block may be read again later
+   *
+   * @param evictedByEvictionProcess true if the given block is evicted by
+   *          EvictionThread
+   * @return the heap size of evicted block
+   */
+  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
+    LruCachedBlock previous = map.remove(block.getCacheKey());
+    if (previous == null) {
+      return 0;
+    }
+    updateSizeMetrics(block, true);
+    long val = elements.decrementAndGet();
+    if (LOG.isTraceEnabled()) {
+      long size = map.size();
+      assertCounterSanity(size, val);
+    }
+    if (block.getBuffer().getBlockType().isData()) {
+      dataBlockElements.decrement();
+    }
+    if (evictedByEvictionProcess) {
+      // When the eviction of the block happened because of invalidation of HFiles, no need to
+      // update the stats counter.
+      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
+      if (victimHandler != null) {
+        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
+      }
+    }
+    // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
+    // NOT move this up because if do that then the victimHandler may access the buffer with
+    // refCnt = 0 which is disallowed.
+    previous.getBuffer().release();
+    return block.heapSize();
+  }
+
+  /**
+   * Multi-threaded call to run the eviction process.
+   */
+  private void runEviction() {
+    if (evictionThread == null) {
+      evict();
+    } else {
+      evictionThread.evict();
+    }
+  }
+
+  boolean isEvictionInProgress() {
+    return evictionInProgress;
+  }
+
+  long getOverhead() {
+    return overhead;
+  }
+
+  /**
+   * Eviction method.
+   *
+   * Evict items in order of use, allowing delete items
+   * which haven't been used for the longest amount of time.
+   *
+   * @return how many bytes were freed
+   */
+  long evict() {
+
+    // Ensure only one eviction at a time
+    if (!evictionLock.tryLock()) {
+      return 0;
+    }
+
+    long bytesToFree = 0L;
+
+    try {
+      evictionInProgress = true;
+      long currentSize = this.size.get();
+      bytesToFree = currentSize - minSize();
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Block cache LRU eviction started; Attempting to free " +
+          StringUtils.byteDesc(bytesToFree) + " of total=" +
+          StringUtils.byteDesc(currentSize));
+      }
+
+      if (bytesToFree <= 0) {
+        return 0;
+      }
+
+      // Instantiate priority buckets
+      BlockBucket bucketSingle
+        = new BlockBucket("single", bytesToFree, blockSize, singleSize());
+      BlockBucket bucketMulti
+        = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
+      BlockBucket bucketMemory
+        = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
+
+      // Scan entire map putting into appropriate buckets
+      for (LruCachedBlock cachedBlock : map.values()) {
+        switch (cachedBlock.getPriority()) {
+          case SINGLE: {
+            bucketSingle.add(cachedBlock);
+            break;
+          }
+          case MULTI: {
+            bucketMulti.add(cachedBlock);
+            break;
+          }
+          case MEMORY: {
+            bucketMemory.add(cachedBlock);
+            break;
+          }
+        }
+      }
+
+      long bytesFreed = 0;
+      if (forceInMemory || memoryFactor > 0.999f) {
+        long s = bucketSingle.totalSize();
+        long m = bucketMulti.totalSize();
+        if (bytesToFree > (s + m)) {
+          // this means we need to evict blocks in memory bucket to make room,
+          // so the single and multi buckets will be emptied
+          bytesFreed = bucketSingle.free(s);
+          bytesFreed += bucketMulti.free(m);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
+              " from single and multi buckets");
+          }
+          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
+              " total from all three buckets ");
+          }
+        } else {
+          // this means no need to evict block in memory bucket,
+          // and we try best to make the ratio between single-bucket and
+          // multi-bucket is 1:2
+          long bytesRemain = s + m - bytesToFree;
+          if (3 * s <= bytesRemain) {
+            // single-bucket is small enough that no eviction happens for it
+            // hence all eviction goes from multi-bucket
+            bytesFreed = bucketMulti.free(bytesToFree);
+          } else if (3 * m <= 2 * bytesRemain) {
+            // multi-bucket is small enough that no eviction happens for it
+            // hence all eviction goes from single-bucket
+            bytesFreed = bucketSingle.free(bytesToFree);
+          } else {
+            // both buckets need to evict some blocks
+            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
+            if (bytesFreed < bytesToFree) {
+              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
+            }
+          }
+        }
+      } else {
+        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
+
+        bucketQueue.add(bucketSingle);
+        bucketQueue.add(bucketMulti);
+        bucketQueue.add(bucketMemory);
+
+        int remainingBuckets = bucketQueue.size();
+
+        BlockBucket bucket;
+        while ((bucket = bucketQueue.poll()) != null) {
+          long overflow = bucket.overflow();
+          if (overflow > 0) {
+            long bucketBytesToFree =
+              Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
+            bytesFreed += bucket.free(bucketBytesToFree);
+          }
+          remainingBuckets--;
+        }
+      }
+      if (LOG.isTraceEnabled()) {
+        long single = bucketSingle.totalSize();
+        long multi = bucketMulti.totalSize();
+        long memory = bucketMemory.totalSize();
+        LOG.trace("Block cache LRU eviction completed; " +
+          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
+          "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
+          "single=" + StringUtils.byteDesc(single) + ", " +
+          "multi=" + StringUtils.byteDesc(multi) + ", " +
+          "memory=" + StringUtils.byteDesc(memory));
+      }
+    } finally {
+      stats.evict();
+      evictionInProgress = false;
+      evictionLock.unlock();
+    }
+    return bytesToFree;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+      .add("blockCount", getBlockCount())
+      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
+      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
+      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
+      .add("heapSize", StringUtils.byteDesc(heapSize()))
+      .add("minSize", StringUtils.byteDesc(minSize()))
+      .add("minFactor", minFactor)
+      .add("multiSize", StringUtils.byteDesc(multiSize()))
+      .add("multiFactor", multiFactor)
+      .add("singleSize", StringUtils.byteDesc(singleSize()))
+      .add("singleFactor", singleFactor)
+      .toString();
+  }
+
+  /**
+   * Used to group blocks into priority buckets.  There will be a BlockBucket
+   * for each priority (single, multi, memory).  Once bucketed, the eviction
+   * algorithm takes the appropriate number of elements out of each according
+   * to configuration parameters and their relatives sizes.
+   */
+  private class BlockBucket implements Comparable<BlockBucket> {
+
+    private final String name;
+    private final LruCachedBlockQueue queue;
+    private long totalSize = 0;
+    private final long bucketSize;
+
+    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
+      this.name = name;
+      this.bucketSize = bucketSize;
+      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
+      totalSize = 0;
+    }
+
+    public void add(LruCachedBlock block) {
+      totalSize += block.heapSize();
+      queue.add(block);
+    }
+
+    public long free(long toFree) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this);
+      }
+      LruCachedBlock cb;
+      long freedBytes = 0;
+      while ((cb = queue.pollLast()) != null) {
+        freedBytes += evictBlock(cb, true);
+        if (freedBytes >= toFree) {
+          return freedBytes;
+        }
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this);
+      }
+      return freedBytes;
+    }
+
+    public long overflow() {
+      return totalSize - bucketSize;
+    }
+
+    public long totalSize() {
+      return totalSize;
+    }
+
+    @Override
+    public int compareTo(BlockBucket that) {
+      return Long.compare(this.overflow(), that.overflow());
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (!(that instanceof BlockBucket)) {
+        return false;
+      }
+      return compareTo((BlockBucket)that) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(name, bucketSize, queue, totalSize);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+        .add("name", name)
+        .add("totalSize", StringUtils.byteDesc(totalSize))
+        .add("bucketSize", StringUtils.byteDesc(bucketSize))
+        .toString();
+    }
+  }
+
+  /**
+   * Get the maximum size of this cache.
+   *
+   * @return max size in bytes
+   */
+
+  @Override
+  public long getMaxSize() {
+    return this.maxSize;
+  }
+
+  @Override
+  public long getCurrentSize() {
+    return this.size.get();
+  }
+
+  @Override
+  public long getCurrentDataSize() {
+    return this.dataBlockSize.sum();
+  }
+
+  @Override
+  public long getFreeSize() {
+    return getMaxSize() - getCurrentSize();
+  }
+
+  @Override
+  public long size() {
+    return getMaxSize();
+  }
+
+  @Override
+  public long getBlockCount() {
+    return this.elements.get();
+  }
+
+  @Override
+  public long getDataBlockCount() {
+    return this.dataBlockElements.sum();
+  }
+
+  EvictionThread getEvictionThread() {
+    return this.evictionThread;
+  }
+
+  /*
+   * Eviction thread.  Sits in waiting state until an eviction is triggered
+   * when the cache size grows above the acceptable level.<p>
+   *
+   * Thread is triggered into action by {@link LruAdaptiveBlockCache#runEviction()}
+   */
+  static class EvictionThread extends Thread {
+
+    private WeakReference<LruAdaptiveBlockCache> cache;
+    private volatile boolean go = true;
+    // flag set after enter the run method, used for test
+    private boolean enteringRun = false;
+
+    public EvictionThread(LruAdaptiveBlockCache cache) {
+      super(Thread.currentThread().getName() + ".LruAdaptiveBlockCache.EvictionThread");
+      setDaemon(true);
+      this.cache = new WeakReference<>(cache);
+    }
+
+    @Override
+    public void run() {
+      enteringRun = true;
+      long freedSumMb = 0;
+      int heavyEvictionCount = 0;
+      int freedDataOverheadPercent = 0;
+      long startTime = System.currentTimeMillis();
+      while (this.go) {
+        synchronized (this) {
+          try {
+            this.wait(1000 * 10/*Don't wait for ever*/);
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted eviction thread ", e);
+            Thread.currentThread().interrupt();
+          }
+        }
+        LruAdaptiveBlockCache cache = this.cache.get();
+        if (cache == null) {
+          break;
+        }
+        freedSumMb += cache.evict()/1024/1024;
+        /*
+         * Sometimes we are reading more data than can fit into BlockCache
+         * and it is the cause a high rate of evictions.
+         * This in turn leads to heavy Garbage Collector works.
+         * So a lot of blocks put into BlockCache but never read,
+         * but spending a lot of CPU resources.
+         * Here we will analyze how many bytes were freed and decide
+         * decide whether the time has come to reduce amount of caching blocks.
+         * It help avoid put too many blocks into BlockCache
+         * when evict() works very active and save CPU for other jobs.
+         * More delails: https://issues.apache.org/jira/browse/HBASE-23887
+         */
+
+        // First of all we have to control how much time
+        // has passed since previuos evict() was launched
+        // This is should be almost the same time (+/- 10s)
+        // because we get comparable volumes of freed bytes each time.
+        // 10s because this is default period to run evict() (see above this.wait)
+        long stopTime = System.currentTimeMillis();
+        if ((stopTime - startTime) > 1000 * 10 - 1) {
+          // Here we have to calc what situation we have got.
+          // We have the limit "hbase.lru.cache.heavy.eviction.bytes.size.limit"
+          // and can calculte overhead on it.
+          // We will use this information to decide,
+          // how to change percent of caching blocks.
+          freedDataOverheadPercent =
+            (int) (freedSumMb * 100 / cache.heavyEvictionMbSizeLimit) - 100;
+          if (freedSumMb > cache.heavyEvictionMbSizeLimit) {
+            // Now we are in the situation when we are above the limit
+            // But maybe we are going to ignore it because it will end quite soon
+            heavyEvictionCount++;
+            if (heavyEvictionCount > cache.heavyEvictionCountLimit) {
+              // It is going for a long time and we have to reduce of caching
+              // blocks now. So we calculate here how many blocks we want to skip.
+              // It depends on:
+              // 1. Overhead - if overhead is big we could more aggressive
+              // reducing amount of caching blocks.
+              // 2. How fast we want to get the result. If we know that our
+              // heavy reading for a long time, we don't want to wait and can
+              // increase the coefficient and get good performance quite soon.
+              // But if we don't sure we can do it slowly and it could prevent
+              // premature exit from this mode. So, when the coefficient is
+              // higher we can get better performance when heavy reading is stable.
+              // But when reading is changing we can adjust to it and set
+              // the coefficient to lower value.
+              int change =
+                (int) (freedDataOverheadPercent * cache.heavyEvictionOverheadCoefficient);
+              // But practice shows that 15% of reducing is quite enough.
+              // We are not greedy (it could lead to premature exit).
+              change = Math.min(15, change);
+              change = Math.max(0, change); // I think it will never happen but check for sure
+              // So this is the key point, here we are reducing % of caching blocks
+              cache.cacheDataBlockPercent -= change;
+              // If we go down too deep we have to stop here, 1% any way should be.
+              cache.cacheDataBlockPercent = Math.max(1, cache.cacheDataBlockPercent);
+            }
+          } else {
+            // Well, we have got overshooting.
+            // Mayby it is just short-term fluctuation and we can stay in this mode.
+            // It help avoid permature exit during short-term fluctuation.
+            // If overshooting less than 90%, we will try to increase the percent of
+            // caching blocks and hope it is enough.
+            if (freedSumMb >= cache.heavyEvictionMbSizeLimit * 0.1) {
+              // Simple logic: more overshooting - more caching blocks (backpressure)
+              int change = (int) (-freedDataOverheadPercent * 0.1 + 1);
+              cache.cacheDataBlockPercent += change;
+              // But it can't be more then 100%, so check it.
+              cache.cacheDataBlockPercent = Math.min(100, cache.cacheDataBlockPercent);
+            } else {
+              // Looks like heavy reading is over.
+              // Just exit form this mode.
+              heavyEvictionCount = 0;
+              cache.cacheDataBlockPercent = 100;
+            }
+          }
+          LOG.info("BlockCache evicted (MB): {}, overhead (%): {}, " +
+              "heavy eviction counter: {}, " +
+              "current caching DataBlock (%): {}",
+            freedSumMb, freedDataOverheadPercent,
+            heavyEvictionCount, cache.cacheDataBlockPercent);
+
+          freedSumMb = 0;
+          startTime = stopTime;
+        }
+      }
+    }
+
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
+      justification="This is what we want")
+    public void evict() {
+      synchronized (this) {
+        this.notifyAll();
+      }
+    }
+
+    synchronized void shutdown() {
+      this.go = false;
+      this.notifyAll();
+    }
+
+    /**
+     * Used for the test.
+     */
+    boolean isEnteringRun() {
+      return this.enteringRun;
+    }
+  }
+
+  /*
+   * Statistics thread.  Periodically prints the cache statistics to the log.
+   */
+  static class StatisticsThread extends Thread {
+
+    private final LruAdaptiveBlockCache lru;
+
+    public StatisticsThread(LruAdaptiveBlockCache lru) {
+      super("LruAdaptiveBlockCacheStats");
+      setDaemon(true);
+      this.lru = lru;
+    }
+
+    @Override
+    public void run() {
+      lru.logStats();
+    }
+  }
+
+  public void logStats() {
+    // Log size
+    long totalSize = heapSize();
+    long freeSize = maxSize - totalSize;
+    LruAdaptiveBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
+      "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
+      "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
+      "blockCount=" + getBlockCount() + ", " +
+      "accesses=" + stats.getRequestCount() + ", " +
+      "hits=" + stats.getHitCount() + ", " +
+      "hitRatio=" + (stats.getHitCount() == 0 ?
+      "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
+      "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
+      "cachingHits=" + stats.getHitCachingCount() + ", " +
+      "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
+      "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
+      "evictions=" + stats.getEvictionCount() + ", " +
+      "evicted=" + stats.getEvictedCount() + ", " +
+      "evictedPerRun=" + stats.evictedPerEviction());
+  }
+
+  /**
+   * Get counter statistics for this cache.
+   *
+   * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
+   * of the eviction processes.
+   */
+  @Override
+  public CacheStats getStats() {
+    return this.stats;
+  }
+
+  public final static long CACHE_FIXED_OVERHEAD =
+    ClassSize.estimateBase(LruAdaptiveBlockCache.class, false);
+
+  @Override
+  public long heapSize() {
+    return getCurrentSize();
+  }
+
+  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
+    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
+    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
+      + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
+      + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+  }
+
+  @Override
+  public Iterator<CachedBlock> iterator() {
+    final Iterator<LruCachedBlock> iterator = map.values().iterator();
+
+    return new Iterator<CachedBlock>() {
+      private final long now = System.nanoTime();
+
+      @Override
+      public boolean hasNext() {
+        return iterator.hasNext();
+      }
+
+      @Override
+      public CachedBlock next() {
+        final LruCachedBlock b = iterator.next();
+        return new CachedBlock() {
+          @Override
+          public String toString() {
+            return BlockCacheUtil.toString(this, now);
+          }
+
+          @Override
+          public BlockPriority getBlockPriority() {
+            return b.getPriority();
+          }
+
+          @Override
+          public BlockType getBlockType() {
+            return b.getBuffer().getBlockType();
+          }
+
+          @Override
+          public long getOffset() {
+            return b.getCacheKey().getOffset();
+          }
+
+          @Override
+          public long getSize() {
+            return b.getBuffer().heapSize();
+          }
+
+          @Override
+          public long getCachedTime() {
+            return b.getCachedTime();
+          }
+
+          @Override
+          public String getFilename() {
+            return b.getCacheKey().getHfileName();
+          }
+
+          @Override
+          public int compareTo(CachedBlock other) {
+            int diff = this.getFilename().compareTo(other.getFilename());
+            if (diff != 0) {
+              return diff;
+            }
+            diff = Long.compare(this.getOffset(), other.getOffset());
+            if (diff != 0) {
+              return diff;
+            }
+            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
+              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
+            }
+            return Long.compare(other.getCachedTime(), this.getCachedTime());
+          }
+
+          @Override
+          public int hashCode() {
+            return b.hashCode();
+          }
+
+          @Override
+          public boolean equals(Object obj) {
+            if (obj instanceof CachedBlock) {
+              CachedBlock cb = (CachedBlock)obj;
+              return compareTo(cb) == 0;
+            } else {
+              return false;
+            }
+          }
+        };
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  // Simple calculators of sizes given factors and maxSize
+
+  long acceptableSize() {
+    return (long)Math.floor(this.maxSize * this.acceptableFactor);
+  }
+  private long minSize() {
+    return (long)Math.floor(this.maxSize * this.minFactor);
+  }
+  private long singleSize() {
+    return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
+  }
+  private long multiSize() {
+    return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
+  }
+  private long memorySize() {
+    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
+  }
+
+  @Override
+  public void shutdown() {
+    if (victimHandler != null) {
+      victimHandler.shutdown();
+    }
+    this.scheduleThreadPool.shutdown();
+    for (int i = 0; i < 10; i++) {
+      if (!this.scheduleThreadPool.isShutdown()) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted while sleeping");
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+
+    if (!this.scheduleThreadPool.isShutdown()) {
+      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
+      LOG.debug("Still running " + runnables);
+    }
+    this.evictionThread.shutdown();
+  }
+
+  /** Clears the cache. Used in tests. */
+  public void clearCache() {
+    this.map.clear();
+    this.elements.set(0);
+  }
+
+  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
+    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
+    for (LruCachedBlock block : map.values()) {
+      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
+      Integer count = counts.get(encoding);
+      counts.put(encoding, (count == null ? 0 : count) + 1);
+    }
+    return counts;
+  }
+
+  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
+    return map;
+  }
+
+  @Override
+  public BlockCache[] getBlockCaches() {
+    if (victimHandler != null) {
+      return new BlockCache[] { this, this.victimHandler };
+    }
+    return null;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruAdaptiveBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruAdaptiveBlockCache.java
new file mode 100644
index 0000000..f29d12a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruAdaptiveBlockCache.java
@@ -0,0 +1,1174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.LruAdaptiveBlockCache.EvictionThread;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests the concurrent LruAdaptiveBlockCache.<p>
+ *
+ * Tests will ensure it grows and shrinks in size properly,
+ * evictions run when they're supposed to and do what they should,
+ * and that cached blocks are accessible when expected to be.
+ */
+@Category({IOTests.class, SmallTests.class})
+public class TestLruAdaptiveBlockCache {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestLruAdaptiveBlockCache.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestLruAdaptiveBlockCache.class);
+
+  @Test
+  public void testCacheEvictionThreadSafe() throws Exception {
+    long maxSize = 100000;
+    int numBlocks = 9;
+    int testRuns = 10;
+    final long blockSize = calculateBlockSizeDefault(maxSize, numBlocks);
+    assertTrue("calculateBlockSize appears broken.", blockSize * numBlocks <= maxSize);
+
+    final Configuration conf = HBaseConfiguration.create();
+    final LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize);
+    EvictionThread evictionThread = cache.getEvictionThread();
+    assertNotNull(evictionThread);
+    while (!evictionThread.isEnteringRun()) {
+      Thread.sleep(1000);
+    }
+    final String hfileName = "hfile";
+    int threads = 10;
+    final int blocksPerThread = 5 * numBlocks;
+    for (int run = 0; run != testRuns; ++run) {
+      final AtomicInteger blockCount = new AtomicInteger(0);
+      ExecutorService service = Executors.newFixedThreadPool(threads);
+      for (int i = 0; i != threads; ++i) {
+        service.execute(() -> {
+            for (int blockIndex = 0; blockIndex < blocksPerThread
+              || (!cache.isEvictionInProgress()); ++blockIndex) {
+              CachedItem block = new CachedItem(hfileName, (int) blockSize,
+                blockCount.getAndIncrement());
+              boolean inMemory = Math.random() > 0.5;
+              cache.cacheBlock(block.cacheKey, block, inMemory);
+            }
+            cache.evictBlocksByHfileName(hfileName);
+          });
+      }
+      service.shutdown();
+      // The test may fail here if the evict thread frees the blocks too fast
+      service.awaitTermination(10, TimeUnit.MINUTES);
+      Waiter.waitFor(conf, 10000, 100, new ExplainingPredicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return cache.getBlockCount() == 0;
+        }
+
+        @Override
+        public String explainFailure() throws Exception {
+          return "Cache block count failed to return to 0";
+        }
+      });
+      assertEquals(0, cache.getBlockCount());
+      assertEquals(cache.getOverhead(), cache.getCurrentSize());
+    }
+  }
+
+  @Test
+  public void testBackgroundEvictionThread() throws Exception {
+    long maxSize = 100000;
+    int numBlocks = 9;
+    long blockSize = calculateBlockSizeDefault(maxSize, numBlocks);
+    assertTrue("calculateBlockSize appears broken.",
+      blockSize * numBlocks <= maxSize);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize,blockSize);
+    EvictionThread evictionThread = cache.getEvictionThread();
+    assertNotNull(evictionThread);
+
+    CachedItem[] blocks = generateFixedBlocks(numBlocks + 1, blockSize, "block");
+
+    // Make sure eviction thread has entered run method
+    while (!evictionThread.isEnteringRun()) {
+      Thread.sleep(1);
+    }
+
+    // Add all the blocks
+    for (CachedItem block : blocks) {
+      cache.cacheBlock(block.cacheKey, block);
+    }
+
+    // wait until at least one eviction has run
+    int n = 0;
+    while(cache.getStats().getEvictionCount() == 0) {
+      Thread.sleep(200);
+      assertTrue("Eviction never happened.", n++ < 20);
+    }
+
+    // let cache stabilize
+    // On some systems, the cache will run multiple evictions before it attains
+    // steady-state. For instance, after populating the cache with 10 blocks,
+    // the first eviction evicts a single block and then a second eviction
+    // evicts another. I think this is due to the delta between minSize and
+    // acceptableSize, combined with variance between object overhead on
+    // different environments.
+    n = 0;
+    for (long prevCnt = 0 /* < number of blocks added */,
+         curCnt = cache.getBlockCount();
+         prevCnt != curCnt; prevCnt = curCnt, curCnt = cache.getBlockCount()) {
+      Thread.sleep(200);
+      assertTrue("Cache never stabilized.", n++ < 20);
+    }
+
+    long evictionCount = cache.getStats().getEvictionCount();
+    assertTrue(evictionCount >= 1);
+    System.out.println("Background Evictions run: " + evictionCount);
+  }
+
+  @Test
+  public void testCacheSimple() throws Exception {
+
+    long maxSize = 1000000;
+    long blockSize = calculateBlockSizeDefault(maxSize, 101);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize);
+
+    CachedItem [] blocks = generateRandomBlocks(100, blockSize);
+
+    long expectedCacheSize = cache.heapSize();
+
+    // Confirm empty
+    for (CachedItem block : blocks) {
+      assertTrue(cache.getBlock(block.cacheKey, true, false,
+        true) == null);
+    }
+
+    // Add blocks
+    for (CachedItem block : blocks) {
+      cache.cacheBlock(block.cacheKey, block);
+      expectedCacheSize += block.cacheBlockHeapSize();
+    }
+
+    // Verify correctly calculated cache heap size
+    assertEquals(expectedCacheSize, cache.heapSize());
+
+    // Check if all blocks are properly cached and retrieved
+    for (CachedItem block : blocks) {
+      HeapSize buf = cache.getBlock(block.cacheKey, true, false,
+        true);
+      assertTrue(buf != null);
+      assertEquals(buf.heapSize(), block.heapSize());
+    }
+
+    // Re-add same blocks and ensure nothing has changed
+    long expectedBlockCount = cache.getBlockCount();
+    for (CachedItem block : blocks) {
+      cache.cacheBlock(block.cacheKey, block);
+    }
+    assertEquals(
+      "Cache should ignore cache requests for blocks already in cache",
+      expectedBlockCount, cache.getBlockCount());
+
+    // Verify correctly calculated cache heap size
+    assertEquals(expectedCacheSize, cache.heapSize());
+
+    // Check if all blocks are properly cached and retrieved
+    for (CachedItem block : blocks) {
+      HeapSize buf = cache.getBlock(block.cacheKey, true, false,
+        true);
+      assertTrue(buf != null);
+      assertEquals(buf.heapSize(), block.heapSize());
+    }
+
+    // Expect no evictions
+    assertEquals(0, cache.getStats().getEvictionCount());
+    Thread t = new LruAdaptiveBlockCache.StatisticsThread(cache);
+    t.start();
+    t.join();
+  }
+
+  @Test
+  public void testCacheEvictionSimple() throws Exception {
+
+    long maxSize = 100000;
+    long blockSize = calculateBlockSizeDefault(maxSize, 10);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize,blockSize,false);
+
+    CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
+
+    long expectedCacheSize = cache.heapSize();
+
+    // Add all the blocks
+    for (CachedItem block : blocks) {
+      cache.cacheBlock(block.cacheKey, block);
+      expectedCacheSize += block.cacheBlockHeapSize();
+    }
+
+    // A single eviction run should have occurred
+    assertEquals(1, cache.getStats().getEvictionCount());
+
+    // Our expected size overruns acceptable limit
+    assertTrue(expectedCacheSize >
+      (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+
+    // But the cache did not grow beyond max
+    assertTrue(cache.heapSize() < maxSize);
+
+    // And is still below the acceptable limit
+    assertTrue(cache.heapSize() <
+      (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+
+    // All blocks except block 0  should be in the cache
+    assertTrue(cache.getBlock(blocks[0].cacheKey, true, false,
+      true) == null);
+    for(int i=1;i<blocks.length;i++) {
+      assertEquals(cache.getBlock(blocks[i].cacheKey, true, false,
+        true),
+        blocks[i]);
+    }
+  }
+
+  @Test
+  public void testCacheEvictionTwoPriorities() throws Exception {
+
+    long maxSize = 100000;
+    long blockSize = calculateBlockSizeDefault(maxSize, 10);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize,blockSize,false);
+
+    CachedItem [] singleBlocks = generateFixedBlocks(5, 10000, "single");
+    CachedItem [] multiBlocks = generateFixedBlocks(5, 10000, "multi");
+
+    long expectedCacheSize = cache.heapSize();
+
+    // Add and get the multi blocks
+    for (CachedItem block : multiBlocks) {
+      cache.cacheBlock(block.cacheKey, block);
+      expectedCacheSize += block.cacheBlockHeapSize();
+      assertEquals(cache.getBlock(block.cacheKey, true, false, true),
+        block);
+    }
+
+    // Add the single blocks (no get)
+    for (CachedItem block : singleBlocks) {
+      cache.cacheBlock(block.cacheKey, block);
+      expectedCacheSize += block.heapSize();
+    }
+
+    // A single eviction run should have occurred
+    assertEquals(1, cache.getStats().getEvictionCount());
+
+    // We expect two entries evicted
+    assertEquals(2, cache.getStats().getEvictedCount());
+
+    // Our expected size overruns acceptable limit
+    assertTrue(expectedCacheSize >
+      (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+
+    // But the cache did not grow beyond max
+    assertTrue(cache.heapSize() <= maxSize);
+
+    // And is now below the acceptable limit
+    assertTrue(cache.heapSize() <=
+      (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+
+    // We expect fairness across the two priorities.
+    // This test makes multi go barely over its limit, in-memory
+    // empty, and the rest in single.  Two single evictions and
+    // one multi eviction expected.
+    assertTrue(cache.getBlock(singleBlocks[0].cacheKey, true, false,
+      true) == null);
+    assertTrue(cache.getBlock(multiBlocks[0].cacheKey, true, false,
+      true) == null);
+
+    // And all others to be cached
+    for(int i=1;i<4;i++) {
+      assertEquals(cache.getBlock(singleBlocks[i].cacheKey, true, false,
+        true),
+        singleBlocks[i]);
+      assertEquals(cache.getBlock(multiBlocks[i].cacheKey, true, false,
+        true),
+        multiBlocks[i]);
+    }
+  }
+
+  @Test
+  public void testCacheEvictionThreePriorities() throws Exception {
+
+    long maxSize = 100000;
+    long blockSize = calculateBlockSize(maxSize, 10);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+      (int)Math.ceil(1.2*maxSize/blockSize),
+      LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+      LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+      0.98f, // min
+      0.99f, // acceptable
+      0.33f, // single
+      0.33f, // multi
+      0.34f, // memory
+      1.2f,  // limit
+      false,
+      16 * 1024 * 1024,
+      10,
+      500,
+      0.01f);
+
+    CachedItem [] singleBlocks = generateFixedBlocks(5, blockSize, "single");
+    CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
+    CachedItem [] memoryBlocks = generateFixedBlocks(5, blockSize, "memory");
+
+    long expectedCacheSize = cache.heapSize();
+
+    // Add 3 blocks from each priority
+    for(int i=0;i<3;i++) {
+
+      // Just add single blocks
+      cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+      expectedCacheSize += singleBlocks[i].cacheBlockHeapSize();
+
+      // Add and get multi blocks
+      cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
+      expectedCacheSize += multiBlocks[i].cacheBlockHeapSize();
+      cache.getBlock(multiBlocks[i].cacheKey, true, false, true);
+
+      // Add memory blocks as such
+      cache.cacheBlock(memoryBlocks[i].cacheKey, memoryBlocks[i], true);
+      expectedCacheSize += memoryBlocks[i].cacheBlockHeapSize();
+
+    }
+
+    // Do not expect any evictions yet
+    assertEquals(0, cache.getStats().getEvictionCount());
+
+    // Verify cache size
+    assertEquals(expectedCacheSize, cache.heapSize());
+
+    // Insert a single block, oldest single should be evicted
+    cache.cacheBlock(singleBlocks[3].cacheKey, singleBlocks[3]);
+
+    // Single eviction, one thing evicted
+    assertEquals(1, cache.getStats().getEvictionCount());
+    assertEquals(1, cache.getStats().getEvictedCount());
+
+    // Verify oldest single block is the one evicted
+    assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false,
+      true));
+
+    // Change the oldest remaining single block to a multi
+    cache.getBlock(singleBlocks[1].cacheKey, true, false, true);
+
+    // Insert another single block
+    cache.cacheBlock(singleBlocks[4].cacheKey, singleBlocks[4]);
+
+    // Two evictions, two evicted.
+    assertEquals(2, cache.getStats().getEvictionCount());
+    assertEquals(2, cache.getStats().getEvictedCount());
+
+    // Oldest multi block should be evicted now
+    assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false,
+      true));
+
+    // Insert another memory block
+    cache.cacheBlock(memoryBlocks[3].cacheKey, memoryBlocks[3], true);
+
+    // Three evictions, three evicted.
+    assertEquals(3, cache.getStats().getEvictionCount());
+    assertEquals(3, cache.getStats().getEvictedCount());
+
+    // Oldest memory block should be evicted now
+    assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false,
+      true));
+
+    // Add a block that is twice as big (should force two evictions)
+    CachedItem [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big");
+    cache.cacheBlock(bigBlocks[0].cacheKey, bigBlocks[0]);
+
+    // Four evictions, six evicted (inserted block 3X size, expect +3 evicted)
+    assertEquals(4, cache.getStats().getEvictionCount());
+    assertEquals(6, cache.getStats().getEvictedCount());
+
+    // Expect three remaining singles to be evicted
+    assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false,
+      true));
+
+    // Make the big block a multi block
+    cache.getBlock(bigBlocks[0].cacheKey, true, false, true);
+
+    // Cache another single big block
+    cache.cacheBlock(bigBlocks[1].cacheKey, bigBlocks[1]);
+
+    // Five evictions, nine evicted (3 new)
+    assertEquals(5, cache.getStats().getEvictionCount());
+    assertEquals(9, cache.getStats().getEvictedCount());
+
+    // Expect three remaining multis to be evicted
+    assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false,
+      true));
+
+    // Cache a big memory block
+    cache.cacheBlock(bigBlocks[2].cacheKey, bigBlocks[2], true);
+
+    // Six evictions, twelve evicted (3 new)
+    assertEquals(6, cache.getStats().getEvictionCount());
+    assertEquals(12, cache.getStats().getEvictedCount());
+
+    // Expect three remaining in-memory to be evicted
+    assertEquals(null, cache.getBlock(memoryBlocks[1].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(memoryBlocks[2].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(memoryBlocks[3].cacheKey, true, false,
+      true));
+  }
+
+  @Test
+  public void testCacheEvictionInMemoryForceMode() throws Exception {
+    long maxSize = 100000;
+    long blockSize = calculateBlockSize(maxSize, 10);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+      (int)Math.ceil(1.2*maxSize/blockSize),
+      LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+      LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+      0.98f, // min
+      0.99f, // acceptable
+      0.2f, // single
+      0.3f, // multi
+      0.5f, // memory
+      1.2f, // limit
+      true,
+      16 * 1024 * 1024,
+      10,
+      500,
+      0.01f);
+
+    CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
+    CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
+    CachedItem [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
+
+    long expectedCacheSize = cache.heapSize();
+
+    // 0. Add 5 single blocks and 4 multi blocks to make cache full, si:mu:me = 5:4:0
+    for(int i = 0; i < 4; i++) {
+      // Just add single blocks
+      cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+      expectedCacheSize += singleBlocks[i].cacheBlockHeapSize();
+      // Add and get multi blocks
+      cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
+      expectedCacheSize += multiBlocks[i].cacheBlockHeapSize();
+      cache.getBlock(multiBlocks[i].cacheKey, true, false, true);
+    }
+    // 5th single block
+    cache.cacheBlock(singleBlocks[4].cacheKey, singleBlocks[4]);
+    expectedCacheSize += singleBlocks[4].cacheBlockHeapSize();
+    // Do not expect any evictions yet
+    assertEquals(0, cache.getStats().getEvictionCount());
+    // Verify cache size
+    assertEquals(expectedCacheSize, cache.heapSize());
+
+    // 1. Insert a memory block, oldest single should be evicted, si:mu:me = 4:4:1
+    cache.cacheBlock(memoryBlocks[0].cacheKey, memoryBlocks[0], true);
+    // Single eviction, one block evicted
+    assertEquals(1, cache.getStats().getEvictionCount());
+    assertEquals(1, cache.getStats().getEvictedCount());
+    // Verify oldest single block (index = 0) is the one evicted
+    assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false,
+      true));
+
+    // 2. Insert another memory block, another single evicted, si:mu:me = 3:4:2
+    cache.cacheBlock(memoryBlocks[1].cacheKey, memoryBlocks[1], true);
+    // Two evictions, two evicted.
+    assertEquals(2, cache.getStats().getEvictionCount());
+    assertEquals(2, cache.getStats().getEvictedCount());
+    // Current oldest single block (index = 1) should be evicted now
+    assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false,
+      true));
+
+    // 3. Insert 4 memory blocks, 2 single and 2 multi evicted, si:mu:me = 1:2:6
+    cache.cacheBlock(memoryBlocks[2].cacheKey, memoryBlocks[2], true);
+    cache.cacheBlock(memoryBlocks[3].cacheKey, memoryBlocks[3], true);
+    cache.cacheBlock(memoryBlocks[4].cacheKey, memoryBlocks[4], true);
+    cache.cacheBlock(memoryBlocks[5].cacheKey, memoryBlocks[5], true);
+    // Three evictions, three evicted.
+    assertEquals(6, cache.getStats().getEvictionCount());
+    assertEquals(6, cache.getStats().getEvictedCount());
+    // two oldest single blocks and two oldest multi blocks evicted
+    assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false,
+      true));
+
+    // 4. Insert 3 memory blocks, the remaining 1 single and 2 multi evicted
+    // si:mu:me = 0:0:9
+    cache.cacheBlock(memoryBlocks[6].cacheKey, memoryBlocks[6], true);
+    cache.cacheBlock(memoryBlocks[7].cacheKey, memoryBlocks[7], true);
+    cache.cacheBlock(memoryBlocks[8].cacheKey, memoryBlocks[8], true);
+    // Three evictions, three evicted.
+    assertEquals(9, cache.getStats().getEvictionCount());
+    assertEquals(9, cache.getStats().getEvictedCount());
+    // one oldest single block and two oldest multi blocks evicted
+    assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(multiBlocks[3].cacheKey, true, false,
+      true));
+
+    // 5. Insert one memory block, the oldest memory evicted
+    // si:mu:me = 0:0:9
+    cache.cacheBlock(memoryBlocks[9].cacheKey, memoryBlocks[9], true);
+    // one eviction, one evicted.
+    assertEquals(10, cache.getStats().getEvictionCount());
+    assertEquals(10, cache.getStats().getEvictedCount());
+    // oldest memory block evicted
+    assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false,
+      true));
+
+    // 6. Insert one new single block, itself evicted immediately since
+    //    all blocks in cache are memory-type which have higher priority
+    // si:mu:me = 0:0:9 (no change)
+    cache.cacheBlock(singleBlocks[9].cacheKey, singleBlocks[9]);
+    // one eviction, one evicted.
+    assertEquals(11, cache.getStats().getEvictionCount());
+    assertEquals(11, cache.getStats().getEvictedCount());
+    // the single block just cached now evicted (can't evict memory)
+    assertEquals(null, cache.getBlock(singleBlocks[9].cacheKey, true, false,
+      true));
+  }
+
+  // test scan resistance
+  @Test
+  public void testScanResistance() throws Exception {
+
+    long maxSize = 100000;
+    long blockSize = calculateBlockSize(maxSize, 10);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+      (int)Math.ceil(1.2*maxSize/blockSize),
+      LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+      LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+      0.66f, // min
+      0.99f, // acceptable
+      0.33f, // single
+      0.33f, // multi
+      0.34f, // memory
+      1.2f,  // limit
+      false,
+      16 * 1024 * 1024,
+      10,
+      500,
+      0.01f);
+
+    CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
+    CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
+
+    // Add 5 multi blocks
+    for (CachedItem block : multiBlocks) {
+      cache.cacheBlock(block.cacheKey, block);
+      cache.getBlock(block.cacheKey, true, false, true);
+    }
+
+    // Add 5 single blocks
+    for(int i=0;i<5;i++) {
+      cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+    }
+
+    // An eviction ran
+    assertEquals(1, cache.getStats().getEvictionCount());
+
+    // To drop down to 2/3 capacity, we'll need to evict 4 blocks
+    assertEquals(4, cache.getStats().getEvictedCount());
+
+    // Should have been taken off equally from single and multi
+    assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false,
+      true));
+    assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false,
+      true));
+
+    // Let's keep "scanning" by adding single blocks.  From here on we only
+    // expect evictions from the single bucket.
+
+    // Every time we reach 10 total blocks (every 4 inserts) we get 4 single
+    // blocks evicted.  Inserting 13 blocks should yield 3 more evictions and
+    // 12 more evicted.
+
+    for(int i=5;i<18;i++) {
+      cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+    }
+
+    // 4 total evictions, 16 total evicted
+    assertEquals(4, cache.getStats().getEvictionCount());
+    assertEquals(16, cache.getStats().getEvictedCount());
+
+    // Should now have 7 total blocks
+    assertEquals(7, cache.getBlockCount());
+
+  }
+
+  @Test
+  public void testMaxBlockSize() throws Exception {
+    long maxSize = 100000;
+    long blockSize = calculateBlockSize(maxSize, 10);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+      (int)Math.ceil(1.2*maxSize/blockSize),
+      LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+      LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+      0.66f, // min
+      0.99f, // acceptable
+      0.33f, // single
+      0.33f, // multi
+      0.34f, // memory
+      1.2f,  // limit
+      false,
+      1024,
+      10,
+      500,
+      0.01f);
+
+    CachedItem [] tooLong = generateFixedBlocks(10, 1024+5, "long");
+    CachedItem [] small = generateFixedBlocks(15, 600, "small");
+
+
+    for (CachedItem i:tooLong) {
+      cache.cacheBlock(i.cacheKey, i);
+    }
+    for (CachedItem i:small) {
+      cache.cacheBlock(i.cacheKey, i);
+    }
+    assertEquals(15,cache.getBlockCount());
+    for (CachedItem i:small) {
+      assertNotNull(cache.getBlock(i.cacheKey, true, false, false));
+    }
+    for (CachedItem i:tooLong) {
+      assertNull(cache.getBlock(i.cacheKey, true, false, false));
+    }
+
+    assertEquals(10, cache.getStats().getFailedInserts());
+  }
+
+  // test setMaxSize
+  @Test
+  public void testResizeBlockCache() throws Exception {
+
+    long maxSize = 300000;
+    long blockSize = calculateBlockSize(maxSize, 31);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+      (int)Math.ceil(1.2*maxSize/blockSize),
+      LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+      LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+      0.98f, // min
+      0.99f, // acceptable
+      0.33f, // single
+      0.33f, // multi
+      0.34f, // memory
+      1.2f,  // limit
+      false,
+      16 * 1024 * 1024,
+      10,
+      500,
+      0.01f);
+
+    CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
+    CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
+    CachedItem [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
+
+    // Add all blocks from all priorities
+    for(int i=0;i<10;i++) {
+
+      // Just add single blocks
+      cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+
+      // Add and get multi blocks
+      cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
+      cache.getBlock(multiBlocks[i].cacheKey, true, false, true);
+
+      // Add memory blocks as such
+      cache.cacheBlock(memoryBlocks[i].cacheKey, memoryBlocks[i], true);
+    }
+
+    // Do not expect any evictions yet
+    assertEquals(0, cache.getStats().getEvictionCount());
+
+    // Resize to half capacity plus an extra block (otherwise we evict an extra)
+    cache.setMaxSize((long)(maxSize * 0.5f));
+
+    // Should have run a single eviction
+    assertEquals(1, cache.getStats().getEvictionCount());
+
+    // And we expect 1/2 of the blocks to be evicted
+    assertEquals(15, cache.getStats().getEvictedCount());
+
+    // And the oldest 5 blocks from each category should be gone
+    for(int i=0;i<5;i++) {
+      assertEquals(null, cache.getBlock(singleBlocks[i].cacheKey, true,
+        false, true));
+      assertEquals(null, cache.getBlock(multiBlocks[i].cacheKey, true,
+        false, true));
+      assertEquals(null, cache.getBlock(memoryBlocks[i].cacheKey, true,
+        false, true));
+    }
+
+    // And the newest 5 blocks should still be accessible
+    for(int i=5;i<10;i++) {
+      assertEquals(singleBlocks[i], cache.getBlock(singleBlocks[i].cacheKey, true,
+        false, true));
+      assertEquals(multiBlocks[i], cache.getBlock(multiBlocks[i].cacheKey, true,
+        false, true));
+      assertEquals(memoryBlocks[i], cache.getBlock(memoryBlocks[i].cacheKey, true,
+        false, true));
+    }
+  }
+
+  // test metricsPastNPeriods
+  @Test
+  public void testPastNPeriodsMetrics() throws Exception {
+    double delta = 0.01;
+
+    // 3 total periods
+    CacheStats stats = new CacheStats("test", 3);
+
+    // No accesses, should be 0
+    stats.rollMetricsPeriod();
+    assertEquals(0.0, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+    // period 1, 1 hit caching, 1 hit non-caching, 2 miss non-caching
+    // should be (2/4)=0.5 and (1/1)=1
+    stats.hit(false, true, BlockType.DATA);
+    stats.hit(true, true, BlockType.DATA);
+    stats.miss(false, false, BlockType.DATA);
+    stats.miss(false, false, BlockType.DATA);
+    stats.rollMetricsPeriod();
+    assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals(1.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+    // period 2, 1 miss caching, 3 miss non-caching
+    // should be (2/8)=0.25 and (1/2)=0.5
+    stats.miss(true, false, BlockType.DATA);
+    stats.miss(false, false, BlockType.DATA);
+    stats.miss(false, false, BlockType.DATA);
+    stats.miss(false, false, BlockType.DATA);
+    stats.rollMetricsPeriod();
+    assertEquals(0.25, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals(0.5, stats.getHitCachingRatioPastNPeriods(), delta);
+
+    // period 3, 2 hits of each type
+    // should be (6/12)=0.5 and (3/4)=0.75
+    stats.hit(false, true, BlockType.DATA);
+    stats.hit(true, true, BlockType.DATA);
+    stats.hit(false, true, BlockType.DATA);
+    stats.hit(true, true, BlockType.DATA);
+    stats.rollMetricsPeriod();
+    assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals(0.75, stats.getHitCachingRatioPastNPeriods(), delta);
+
+    // period 4, evict period 1, two caching misses
+    // should be (4/10)=0.4 and (2/5)=0.4
+    stats.miss(true, false, BlockType.DATA);
+    stats.miss(true, false, BlockType.DATA);
+    stats.rollMetricsPeriod();
+    assertEquals(0.4, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals(0.4, stats.getHitCachingRatioPastNPeriods(), delta);
+
+    // period 5, evict period 2, 2 caching misses, 2 non-caching hit
+    // should be (6/10)=0.6 and (2/6)=1/3
+    stats.miss(true, false, BlockType.DATA);
+    stats.miss(true, false, BlockType.DATA);
+    stats.hit(false, true, BlockType.DATA);
+    stats.hit(false, true, BlockType.DATA);
+    stats.rollMetricsPeriod();
+    assertEquals(0.6, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals((double)1/3, stats.getHitCachingRatioPastNPeriods(), delta);
+
+    // period 6, evict period 3
+    // should be (2/6)=1/3 and (0/4)=0
+    stats.rollMetricsPeriod();
+    assertEquals((double)1/3, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+    // period 7, evict period 4
+    // should be (2/4)=0.5 and (0/2)=0
+    stats.rollMetricsPeriod();
+    assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+    // period 8, evict period 5
+    // should be 0 and 0
+    stats.rollMetricsPeriod();
+    assertEquals(0.0, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+    // period 9, one of each
+    // should be (2/4)=0.5 and (1/2)=0.5
+    stats.miss(true, false, BlockType.DATA);
+    stats.miss(false, false, BlockType.DATA);
+    stats.hit(true, true, BlockType.DATA);
+    stats.hit(false, true, BlockType.DATA);
+    stats.rollMetricsPeriod();
+    assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta);
+    assertEquals(0.5, stats.getHitCachingRatioPastNPeriods(), delta);
+  }
+
+  @Test
+  public void testCacheBlockNextBlockMetadataMissing() {
+    long maxSize = 100000;
+    long blockSize = calculateBlockSize(maxSize, 10);
+    int size = 100;
+    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
+    byte[] byteArr = new byte[length];
+    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
+    HFileContext meta = new HFileContextBuilder().build();
+    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size,
+      -1, ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52,
+      -1, meta, HEAP);
+    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size,
+      -1, ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1,
+      -1, meta, HEAP);
+
+    LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+      (int)Math.ceil(1.2*maxSize/blockSize),
+      LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+      LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+      0.66f, // min
+      0.99f, // acceptable
+      0.33f, // single
+      0.33f, // multi
+      0.34f, // memory
+      1.2f,  // limit
+      false,
+      1024,
+      10,
+      500,
+      0.01f);
+
+    BlockCacheKey key = new BlockCacheKey("key1", 0);
+    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
+    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
+    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
+    blockWithNextBlockMetadata.serialize(block1Buffer, true);
+    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
+
+    //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
+    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
+      block1Buffer);
+
+    //Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
+    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
+      block1Buffer);
+
+    //Clear and add blockWithoutNextBlockMetadata
+    cache.clearCache();
+    assertNull(cache.getBlock(key, false, false, false));
+    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
+      block2Buffer);
+
+    //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
+    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
+      block1Buffer);
+  }
+
+  private CachedItem [] generateFixedBlocks(int numBlocks, int size, String pfx) {
+    CachedItem [] blocks = new CachedItem[numBlocks];
+    for(int i=0;i<numBlocks;i++) {
+      blocks[i] = new CachedItem(pfx + i, size);
+    }
+    return blocks;
+  }
+
+  private CachedItem [] generateFixedBlocks(int numBlocks, long size, String pfx) {
+    return generateFixedBlocks(numBlocks, (int)size, pfx);
+  }
+
+  private CachedItem [] generateRandomBlocks(int numBlocks, long maxSize) {
+    CachedItem [] blocks = new CachedItem[numBlocks];
+    Random r = new Random();
+    for(int i=0;i<numBlocks;i++) {
+      blocks[i] = new CachedItem("block" + i, r.nextInt((int)maxSize)+1);
+    }
+    return blocks;
+  }
+
+  private long calculateBlockSize(long maxSize, int numBlocks) {
+    long roughBlockSize = maxSize / numBlocks;
+    int numEntries = (int)Math.ceil((1.2)*maxSize/roughBlockSize);
+    long totalOverhead = LruAdaptiveBlockCache.CACHE_FIXED_OVERHEAD +
+      ClassSize.CONCURRENT_HASHMAP +
+      (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
+      (LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+    long negateBlockSize = (long)(totalOverhead/numEntries);
+    negateBlockSize += LruCachedBlock.PER_BLOCK_OVERHEAD;
+    return ClassSize.align((long)Math.floor((roughBlockSize - negateBlockSize)*0.99f));
+  }
+
+  private long calculateBlockSizeDefault(long maxSize, int numBlocks) {
+    long roughBlockSize = maxSize / numBlocks;
+    int numEntries = (int)Math.ceil((1.2)*maxSize/roughBlockSize);
+    long totalOverhead = LruAdaptiveBlockCache.CACHE_FIXED_OVERHEAD +
+      ClassSize.CONCURRENT_HASHMAP +
+      (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
+      (LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+    long negateBlockSize = totalOverhead / numEntries;
+    negateBlockSize += LruCachedBlock.PER_BLOCK_OVERHEAD;
+    return ClassSize.align((long)Math.floor((roughBlockSize - negateBlockSize)*
+      LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+  }
+
+  private static class CachedItem implements Cacheable {
+    BlockCacheKey cacheKey;
+    int size;
+
+    CachedItem(String blockName, int size, int offset) {
+      this.cacheKey = new BlockCacheKey(blockName, offset);
+      this.size = size;
+    }
+
+    CachedItem(String blockName, int size) {
+      this.cacheKey = new BlockCacheKey(blockName, 0);
+      this.size = size;
+    }
+
+    /** The size of this item reported to the block cache layer */
+    @Override
+    public long heapSize() {
+      return ClassSize.align(size);
+    }
+
+    /** Size of the cache block holding this item. Used for verification. */
+    public long cacheBlockHeapSize() {
+      return LruCachedBlock.PER_BLOCK_OVERHEAD
+        + ClassSize.align(cacheKey.heapSize())
+        + ClassSize.align(size);
+    }
+
+    @Override
+    public int getSerializedLength() {
+      return 0;
+    }
+
+    @Override
+    public CacheableDeserializer<Cacheable> getDeserializer() {
+      return null;
+    }
+
+    @Override
+    public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
+    }
+
+    @Override
+    public BlockType getBlockType() {
+      return BlockType.DATA;
+    }
+  }
+
+  static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception {
+    int size = 100;
+    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
+    byte[] byteArr = new byte[length];
+    HFileContext meta = new HFileContextBuilder().build();
+    BlockCacheKey key = new BlockCacheKey("key1", 0);
+    HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1,
+      ByteBuff.wrap(ByteBuffer.wrap(byteArr, 0, size)), HFileBlock.FILL_HEADER, -1,
+      52, -1, meta,
+      HEAP);
+    AtomicBoolean err1 = new AtomicBoolean(false);
+    Thread t1 = new Thread(() -> {
+      for (int i = 0; i < 10000 && !err1.get(); i++) {
+        try {
+          cache.getBlock(key, false, false, true);
+        } catch (Exception e) {
+          err1.set(true);
+          LOG.info("Cache block or get block failure: ", e);
+        }
+      }
+    });
+
+    AtomicBoolean err2 = new AtomicBoolean(false);
+    Thread t2 = new Thread(() -> {
+      for (int i = 0; i < 10000 && !err2.get(); i++) {
+        try {
+          cache.evictBlock(key);
+        } catch (Exception e) {
+          err2.set(true);
+          LOG.info("Evict block failure: ", e);
+        }
+      }
+    });
+
+    AtomicBoolean err3 = new AtomicBoolean(false);
+    Thread t3 = new Thread(() -> {
+      for (int i = 0; i < 10000 && !err3.get(); i++) {
+        try {
+          cache.cacheBlock(key, blk);
+        } catch (Exception e) {
+          err3.set(true);
+          LOG.info("Cache block failure: ", e);
+        }
+      }
+    });
+    t1.start();
+    t2.start();
+    t3.start();
+    t1.join();
+    t2.join();
+    t3.join();
+    Assert.assertFalse(err1.get());
+    Assert.assertFalse(err2.get());
+    Assert.assertFalse(err3.get());
+  }
+
+  @Test
+  public void testMultiThreadGetAndEvictBlock() throws Exception {
+    long maxSize = 100000;
+    long blockSize = calculateBlockSize(maxSize, 10);
+    LruAdaptiveBlockCache cache =
+      new LruAdaptiveBlockCache(maxSize, blockSize, false,
+        (int) Math.ceil(1.2 * maxSize / blockSize),
+        LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+        0.66f, // min
+        0.99f, // acceptable
+        0.33f, // single
+        0.33f, // multi
+        0.34f, // memory
+        1.2f, // limit
+        false, 1024,
+        10,
+        500,
+        0.01f);
+    testMultiThreadGetAndEvictBlockInternal(cache);
+  }
+
+  public void testSkipCacheDataBlocksInteral(int heavyEvictionCountLimit) throws Exception {
+    long maxSize = 100000000;
+    int numBlocks = 100000;
+    final long blockSize = calculateBlockSizeDefault(maxSize, numBlocks);
+    assertTrue("calculateBlockSize appears broken.",
+      blockSize * numBlocks <= maxSize);
+
+    final LruAdaptiveBlockCache cache =
+      new LruAdaptiveBlockCache(maxSize, blockSize, true,
+        (int) Math.ceil(1.2 * maxSize / blockSize),
+        LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+        0.5f, // min
+        0.99f, // acceptable
+        0.33f, // single
+        0.33f, // multi
+        0.34f, // memory
+        1.2f, // limit
+        false,
+        maxSize,
+        heavyEvictionCountLimit,
+        200,
+        0.01f);
+
+    EvictionThread evictionThread = cache.getEvictionThread();
+    assertNotNull(evictionThread);
+    while (!evictionThread.isEnteringRun()) {
+      Thread.sleep(1);
+    }
+
+    final String hfileName = "hfile";
+    for (int blockIndex = 0; blockIndex <= numBlocks * 3000; ++blockIndex) {
+      CachedItem block = new CachedItem(hfileName, (int) blockSize, blockIndex);
+      cache.cacheBlock(block.cacheKey, block, false);
+      if (cache.getCacheDataBlockPercent() < 70) {
+        // enough for test
+        break;
+      }
+    }
+
+    evictionThread.evict();
+    Thread.sleep(100);
+
+    if (heavyEvictionCountLimit == 0) {
+      // Check if all offset (last two digits) of cached blocks less than the percent.
+      // It means some of blocks haven't put into BlockCache
+      assertTrue(cache.getCacheDataBlockPercent() < 90);
+      for (BlockCacheKey key : cache.getMapForTests().keySet()) {
+        assertTrue(!(key.getOffset() % 100 > 90));
+      }
+    } else {
+      // Check that auto-scaling is not working (all blocks in BlockCache)
+      assertTrue(cache.getCacheDataBlockPercent() == 100);
+      int counter = 0;
+      for (BlockCacheKey key : cache.getMapForTests().keySet()) {
+        if (key.getOffset() % 100 > 90) {
+          counter++;
+        }
+      }
+      assertTrue(counter > 1000);
+    }
+    evictionThread.shutdown();
+  }
+
+  @Test
+  public void testSkipCacheDataBlocks() throws Exception {
+    // Check that auto-scaling will work right after start
+    testSkipCacheDataBlocksInteral(0);
+    // Check that auto-scaling will not work right after start
+    // (have to finished before auto-scaling)
+    testSkipCacheDataBlocksInteral(100);
+  }
+}