blob: 8e89fd826399f791f8e7e76d7897d9257fc86c87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.dict;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.global.AppendDictSlice;
import org.apache.kylin.dict.global.AppendDictSliceKey;
import org.apache.kylin.dict.global.GlobalDictHDFSStore;
import org.apache.kylin.dict.global.GlobalDictMetadata;
import org.apache.kylin.dict.global.GlobalDictStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheStats;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
/**
* A dictionary based on Trie data structure that maps enumerations of byte[] to
* int IDs, used for global dictionary.
* <p>
* Trie data is split into sub trees, called {@link AppendDictSlice}.
* <p>
* With Trie the memory footprint of the mapping is kinda minimized at the cost
* CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is
* roughly 10 times slower, so there's a cache layer overlays on top of Trie and
* gracefully fall back to Trie using a weak reference.
* <p>
* The implementation is NOT thread-safe for now.
* <p>
* TODO making it thread-safe
*
* @author sunyerui
*/
@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
public class AppendTrieDictionary<T> extends CacheDictionary<T> {
public static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65,
0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict"
public static final int HEAD_SIZE_I = HEAD_MAGIC.length;
private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionary.class);
transient private Boolean isSaveAbsolutePath = false;
transient private String baseDir;
transient private GlobalDictMetadata metadata;
transient private LoadingCache<AppendDictSliceKey, AppendDictSlice> dictCache;
private int evictionThreshold = 0;
public void init(String baseDir) throws IOException {
this.baseDir = convertToAbsolutePath(baseDir);
final GlobalDictStore globalDictStore = new GlobalDictHDFSStore(this.baseDir);
Long[] versions = globalDictStore.listAllVersions();
if (versions.length == 0) {
this.metadata = new GlobalDictMetadata(0, 0, 0, 0, null, new TreeMap<AppendDictSliceKey, String>());
return; // for the removed SegmentAppendTrieDictBuilder
}
final long latestVersion = versions[versions.length - 1];
final Path latestVersionPath = globalDictStore.getVersionDir(latestVersion);
this.metadata = globalDictStore.getMetadata(latestVersion);
this.bytesConvert = metadata.bytesConverter;
// see: https://github.com/google/guava/wiki/CachesExplained
CacheBuilder cacheBuilder = CacheBuilder.newBuilder().softValues().recordStats();
int cacheMaximumSize = KylinConfig.getInstanceFromEnv().getCachedDictMaxSize();
if (cacheMaximumSize > 0) {
cacheBuilder = cacheBuilder.maximumSize(cacheMaximumSize);
logger.info("Set dict cache maximum size to " + cacheMaximumSize);
}
this.dictCache = cacheBuilder
.removalListener(new RemovalListener<AppendDictSliceKey, AppendDictSlice>() {
@Override
public void onRemoval(RemovalNotification<AppendDictSliceKey, AppendDictSlice> notification) {
logger.info("Evict slice with key {} and value {} caused by {}, size {}/{}",
notification.getKey(), notification.getValue(), notification.getCause(),
dictCache.size(), metadata.sliceFileMap.size());
}
}).build(new CacheLoader<AppendDictSliceKey, AppendDictSlice>() {
@Override
public AppendDictSlice load(AppendDictSliceKey key) throws Exception {
AppendDictSlice slice = globalDictStore.readSlice(latestVersionPath.toString(),
metadata.sliceFileMap.get(key));
logger.trace("Load slice with key {} and value {}", key, slice);
return slice;
}
});
this.evictionThreshold = KylinConfig.getInstanceFromEnv().getDictionarySliceEvicationThreshold();
}
@Override
public int getIdFromValueBytesWithoutCache(byte[] value, int offset, int len, int roundingFlag) {
byte[] val = Arrays.copyOfRange(value, offset, offset + len);
AppendDictSliceKey sliceKey = metadata.sliceFileMap.floorKey(AppendDictSliceKey.wrap(val));
if (sliceKey == null) {
sliceKey = metadata.sliceFileMap.firstKey();
}
AppendDictSlice slice;
try {
slice = dictCache.get(sliceKey);
} catch (ExecutionException e) {
throw new IllegalStateException("Failed to load slice with key " + sliceKey, e.getCause());
}
CacheStats stats = dictCache.stats();
if (evictionThreshold > 0 && stats.evictionCount() > evictionThreshold * metadata.sliceFileMap.size()
&& stats.loadCount() > (evictionThreshold + 1) * metadata.sliceFileMap.size()) {
logger.warn(
"Too many dict slice evictions and reloads, maybe the memory is not enough to hold all the dictionary");
throw new RuntimeException("Too many dict slice evictions: " + stats + " for "
+ metadata.sliceFileMap.size() + " dict slices. "
+ "Maybe the memory is not enough to hold all the dictionary, try to enlarge the mapreduce/spark executor memory.");
}
return slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag);
}
public CacheStats getCacheStats() {
return dictCache.stats();
}
public GlobalDictMetadata getDictMetadata() {
return metadata;
}
@Override
public int getMinId() {
return metadata.baseId;
}
@Override
public int getMaxId() {
return metadata.maxId;
}
@Override
public int getSizeOfId() {
return Integer.SIZE / Byte.SIZE;
}
@Override
public int getSizeOfValue() {
return metadata.maxValueLength;
}
@Override
protected byte[] getValueBytesFromIdWithoutCache(int id) {
throw new UnsupportedOperationException("AppendTrieDictionary can't retrieve value from id");
}
@Override
public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
GlobalDictStore store = new GlobalDictHDFSStore(baseDir);
String dstBaseDir = store.copyToAnotherMeta(srcConfig, dstConfig);
AppendTrieDictionary newDict = new AppendTrieDictionary();
newDict.init(dstBaseDir);
return newDict;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(convertToRelativePath(baseDir));
}
@Override
public void readFields(DataInput in) throws IOException {
init(in.readUTF());
}
@Override
public void dump(PrintStream out) {
out.println(String.format(Locale.ROOT, "Total %d values and %d slices", metadata.nValues,
metadata.sliceFileMap.size()));
}
@Override
public int hashCode() {
return Objects.hashCode(baseDir);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof AppendTrieDictionary) {
AppendTrieDictionary that = (AppendTrieDictionary) o;
return Objects.equals(this.baseDir, that.baseDir);
}
return false;
}
@Override
public String toString() {
return String.format(Locale.ROOT, "AppendTrieDictionary(%s)", baseDir);
}
@Override
public boolean contains(Dictionary other) {
return false;
}
/**
* JIRA: https://issues.apache.org/jira/browse/KYLIN-2945
* if pass a absolute path, it may produce some problems like cannot find global dict after migration.
* so convert to relative path can avoid it and be better to maintain flexibility.
*
*/
private String convertToRelativePath(String path) {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
if (!isSaveAbsolutePath && path.startsWith(hdfsWorkingDir)) {
return path.substring(hdfsWorkingDir.length());
}
return path;
}
private String convertToAbsolutePath(String path) {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
Path basicPath = new Path(path);
if (basicPath.toUri().getScheme() == null)
return kylinConfig.getHdfsWorkingDirectory() + path;
String[] paths = path.split("/resources/GlobalDict/");
if (paths.length == 2)
return kylinConfig.getHdfsWorkingDirectory() + "/resources/GlobalDict/" + paths[1];
paths = path.split("/resources/SegmentDict/");
if (paths.length == 2) {
return kylinConfig.getHdfsWorkingDirectory() + "/resources/SegmentDict/" + paths[1];
} else {
throw new RuntimeException(
"the basic directory of global dictionary only support the format which contains '/resources/GlobalDict/' or '/resources/SegmentDict/'");
}
}
/**
* only for test
*
* @param flag
*/
void setSaveAbsolutePath(Boolean flag) {
this.isSaveAbsolutePath = flag;
}
}