KYLIN-4097: Throw exception when too many dict slice evictions in AppendTrieDictionary
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6f73024..1e61b19 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -500,6 +500,10 @@
         return Long.parseLong(getOptional("kylin.dictionary.max-cache-entry", "3000"));
     }
 
+    public int getCachedDictMaxSize() {
+        return Integer.parseInt(getOptional("kylin.dictionary.max-cache-size", "-1"));
+    }
+
     public boolean isGrowingDictEnabled() {
         return Boolean.parseBoolean(this.getOptional("kylin.dictionary.growing-enabled", FALSE));
     }
@@ -552,6 +556,11 @@
         return Boolean.parseBoolean(this.getOptional("kylin.dictionary.shrunken-from-global-enabled", TRUE));
     }
 
+    public int getDictionarySliceEvicationThreshold() {
+        return Integer.parseInt(getOptional("kylin.dictionary.slice.eviction.threshold", "5"));
+    }
+
+
     // ============================================================================
     // mr-hive dict
     // ============================================================================
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
index 3a55961..8e89fd8 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
@@ -39,6 +39,7 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheStats;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
@@ -73,6 +74,8 @@
     transient private GlobalDictMetadata metadata;
     transient private LoadingCache<AppendDictSliceKey, AppendDictSlice> dictCache;
 
+    private int evictionThreshold = 0;
+
     public void init(String baseDir) throws IOException {
         this.baseDir = convertToAbsolutePath(baseDir);
         final GlobalDictStore globalDictStore = new GlobalDictHDFSStore(this.baseDir);
@@ -87,7 +90,15 @@
         final Path latestVersionPath = globalDictStore.getVersionDir(latestVersion);
         this.metadata = globalDictStore.getMetadata(latestVersion);
         this.bytesConvert = metadata.bytesConverter;
-        this.dictCache = CacheBuilder.newBuilder().softValues()
+
+        // see: https://github.com/google/guava/wiki/CachesExplained
+        CacheBuilder cacheBuilder = CacheBuilder.newBuilder().softValues().recordStats();
+        int cacheMaximumSize = KylinConfig.getInstanceFromEnv().getCachedDictMaxSize();
+        if (cacheMaximumSize > 0) {
+            cacheBuilder = cacheBuilder.maximumSize(cacheMaximumSize);
+            logger.info("Set dict cache maximum size to " + cacheMaximumSize);
+        }
+        this.dictCache = cacheBuilder
                 .removalListener(new RemovalListener<AppendDictSliceKey, AppendDictSlice>() {
                     @Override
                     public void onRemoval(RemovalNotification<AppendDictSliceKey, AppendDictSlice> notification) {
@@ -104,6 +115,7 @@
                         return slice;
                     }
                 });
+        this.evictionThreshold = KylinConfig.getInstanceFromEnv().getDictionarySliceEvicationThreshold();
     }
 
     @Override
@@ -119,9 +131,26 @@
         } catch (ExecutionException e) {
             throw new IllegalStateException("Failed to load slice with key " + sliceKey, e.getCause());
         }
+        CacheStats stats = dictCache.stats();
+        if (evictionThreshold > 0 && stats.evictionCount() > evictionThreshold * metadata.sliceFileMap.size()
+                && stats.loadCount() > (evictionThreshold + 1) * metadata.sliceFileMap.size()) {
+            logger.warn(
+                    "Too many dict slice evictions and reloads, maybe the memory is not enough to hold all the dictionary");
+            throw new RuntimeException("Too many dict slice evictions: " + stats + " for "
+                    + metadata.sliceFileMap.size() + " dict slices. "
+                    + "Maybe the memory is not enough to hold all the dictionary, try to enlarge the mapreduce/spark executor memory.");
+        }
         return slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag);
     }
 
+    public CacheStats getCacheStats() {
+        return dictCache.stats();
+    }
+
+    public GlobalDictMetadata getDictMetadata() {
+        return metadata;
+    }
+
     @Override
     public int getMinId() {
         return metadata.baseId;
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index 7e5421a..7907da8 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -592,4 +592,45 @@
         }
     }
 
+    @Test
+    public void testTooManySliceEvictions() throws IOException {
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.max-cache-size", "3");
+        AppendTrieDictionaryBuilder builder = createBuilder();
+        for (int i = 0 ; i < 100000; i++) {
+            builder.addValue(Integer.toString(i));
+        }
+        AppendTrieDictionary dict = builder.build(0);
+
+        assertEquals(4, dict.getDictMetadata().sliceFileMap.size());
+        assertEquals(1, dict.getIdFromValue("0", 0));
+        assertEquals(0, dict.getCacheStats().evictionCount());
+        assertEquals(1, dict.getCacheStats().loadCount());
+
+
+        List<String> keys = new ArrayList<>(100000);
+        for (int i = 0 ; i < 100000; i++) {
+            keys.add(Integer.toString(i));
+        }
+        Collections.sort(keys);
+        for (String key : keys) {
+            assertEquals(Integer.parseInt(key) + 1, dict.getIdFromValue(key, 0));
+        }
+        assertEquals(1, dict.getCacheStats().evictionCount());
+        assertEquals(4, dict.getCacheStats().loadCount());
+
+        // out of order
+        Collections.shuffle(keys);
+        try {
+            for (String key : keys) {
+                assertEquals(Integer.parseInt(key) + 1, dict.getIdFromValue(key, 0));
+            }
+            assertFalse("Should throw RuntimeException for too many dict slice evictions", true);
+        } catch (RuntimeException e) {
+            assertEquals("Too many dict slice evictions", e.getMessage().substring(0, 29));
+        }
+        assertEquals(22, dict.getCacheStats().evictionCount());
+        assertEquals(25, dict.getCacheStats().loadCount());
+
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.max-cache-size", "-1");
+    }
 }