| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kylin.dict; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.PrintStream; |
| import java.util.Arrays; |
| import java.util.Locale; |
| import java.util.Objects; |
| import java.util.TreeMap; |
| import java.util.concurrent.ExecutionException; |
| |
| import org.apache.hadoop.fs.Path; |
| import org.apache.kylin.common.KylinConfig; |
| import org.apache.kylin.common.util.Dictionary; |
| import org.apache.kylin.dict.global.AppendDictSlice; |
| import org.apache.kylin.dict.global.AppendDictSliceKey; |
| import org.apache.kylin.dict.global.GlobalDictHDFSStore; |
| import org.apache.kylin.dict.global.GlobalDictMetadata; |
| import org.apache.kylin.dict.global.GlobalDictStore; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheStats; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.common.cache.RemovalListener; |
| import com.google.common.cache.RemovalNotification; |
| |
| /** |
| * A dictionary based on Trie data structure that maps enumerations of byte[] to |
| * int IDs, used for global dictionary. |
| * <p> |
| * Trie data is split into sub trees, called {@link AppendDictSlice}. |
| * <p> |
| * With Trie the memory footprint of the mapping is kinda minimized at the cost |
| * CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is |
| * roughly 10 times slower, so there's a cache layer overlays on top of Trie and |
| * gracefully fall back to Trie using a weak reference. |
| * <p> |
| * The implementation is NOT thread-safe for now. |
| * <p> |
| * TODO making it thread-safe |
| * |
| * @author sunyerui |
| */ |
| @SuppressWarnings({ "rawtypes", "unchecked", "serial" }) |
| public class AppendTrieDictionary<T> extends CacheDictionary<T> { |
| public static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, |
| 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict" |
| public static final int HEAD_SIZE_I = HEAD_MAGIC.length; |
| private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionary.class); |
| |
| transient private Boolean isSaveAbsolutePath = false; |
| transient private String baseDir; |
| transient private GlobalDictMetadata metadata; |
| transient private LoadingCache<AppendDictSliceKey, AppendDictSlice> dictCache; |
| |
| private int evictionThreshold = 0; |
| |
| public void init(String baseDir) throws IOException { |
| this.baseDir = convertToAbsolutePath(baseDir); |
| final GlobalDictStore globalDictStore = new GlobalDictHDFSStore(this.baseDir); |
| Long[] versions = globalDictStore.listAllVersions(); |
| |
| if (versions.length == 0) { |
| this.metadata = new GlobalDictMetadata(0, 0, 0, 0, null, new TreeMap<AppendDictSliceKey, String>()); |
| return; // for the removed SegmentAppendTrieDictBuilder |
| } |
| |
| final long latestVersion = versions[versions.length - 1]; |
| final Path latestVersionPath = globalDictStore.getVersionDir(latestVersion); |
| this.metadata = globalDictStore.getMetadata(latestVersion); |
| this.bytesConvert = metadata.bytesConverter; |
| |
| // see: https://github.com/google/guava/wiki/CachesExplained |
| CacheBuilder cacheBuilder = CacheBuilder.newBuilder().softValues().recordStats(); |
| int cacheMaximumSize = KylinConfig.getInstanceFromEnv().getCachedDictMaxSize(); |
| if (cacheMaximumSize > 0) { |
| cacheBuilder = cacheBuilder.maximumSize(cacheMaximumSize); |
| logger.info("Set dict cache maximum size to " + cacheMaximumSize); |
| } |
| this.dictCache = cacheBuilder |
| .removalListener(new RemovalListener<AppendDictSliceKey, AppendDictSlice>() { |
| @Override |
| public void onRemoval(RemovalNotification<AppendDictSliceKey, AppendDictSlice> notification) { |
| logger.info("Evict slice with key {} and value {} caused by {}, size {}/{}", |
| notification.getKey(), notification.getValue(), notification.getCause(), |
| dictCache.size(), metadata.sliceFileMap.size()); |
| } |
| }).build(new CacheLoader<AppendDictSliceKey, AppendDictSlice>() { |
| @Override |
| public AppendDictSlice load(AppendDictSliceKey key) throws Exception { |
| AppendDictSlice slice = globalDictStore.readSlice(latestVersionPath.toString(), |
| metadata.sliceFileMap.get(key)); |
| logger.trace("Load slice with key {} and value {}", key, slice); |
| return slice; |
| } |
| }); |
| this.evictionThreshold = KylinConfig.getInstanceFromEnv().getDictionarySliceEvicationThreshold(); |
| } |
| |
| @Override |
| public int getIdFromValueBytesWithoutCache(byte[] value, int offset, int len, int roundingFlag) { |
| byte[] val = Arrays.copyOfRange(value, offset, offset + len); |
| AppendDictSliceKey sliceKey = metadata.sliceFileMap.floorKey(AppendDictSliceKey.wrap(val)); |
| if (sliceKey == null) { |
| sliceKey = metadata.sliceFileMap.firstKey(); |
| } |
| AppendDictSlice slice; |
| try { |
| slice = dictCache.get(sliceKey); |
| } catch (ExecutionException e) { |
| throw new IllegalStateException("Failed to load slice with key " + sliceKey, e.getCause()); |
| } |
| CacheStats stats = dictCache.stats(); |
| if (evictionThreshold > 0 && stats.evictionCount() > evictionThreshold * metadata.sliceFileMap.size() |
| && stats.loadCount() > (evictionThreshold + 1) * metadata.sliceFileMap.size()) { |
| logger.warn( |
| "Too many dict slice evictions and reloads, maybe the memory is not enough to hold all the dictionary"); |
| throw new RuntimeException("Too many dict slice evictions: " + stats + " for " |
| + metadata.sliceFileMap.size() + " dict slices. " |
| + "Maybe the memory is not enough to hold all the dictionary, try to enlarge the mapreduce/spark executor memory."); |
| } |
| return slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag); |
| } |
| |
| public CacheStats getCacheStats() { |
| return dictCache.stats(); |
| } |
| |
| public GlobalDictMetadata getDictMetadata() { |
| return metadata; |
| } |
| |
| @Override |
| public int getMinId() { |
| return metadata.baseId; |
| } |
| |
| @Override |
| public int getMaxId() { |
| return metadata.maxId; |
| } |
| |
| @Override |
| public int getSizeOfId() { |
| return Integer.SIZE / Byte.SIZE; |
| } |
| |
| @Override |
| public int getSizeOfValue() { |
| return metadata.maxValueLength; |
| } |
| |
| @Override |
| protected byte[] getValueBytesFromIdWithoutCache(int id) { |
| throw new UnsupportedOperationException("AppendTrieDictionary can't retrieve value from id"); |
| } |
| |
| @Override |
| public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException { |
| GlobalDictStore store = new GlobalDictHDFSStore(baseDir); |
| String dstBaseDir = store.copyToAnotherMeta(srcConfig, dstConfig); |
| AppendTrieDictionary newDict = new AppendTrieDictionary(); |
| newDict.init(dstBaseDir); |
| return newDict; |
| } |
| |
| @Override |
| public void write(DataOutput out) throws IOException { |
| out.writeUTF(convertToRelativePath(baseDir)); |
| } |
| |
| @Override |
| public void readFields(DataInput in) throws IOException { |
| init(in.readUTF()); |
| } |
| |
| @Override |
| public void dump(PrintStream out) { |
| out.println(String.format(Locale.ROOT, "Total %d values and %d slices", metadata.nValues, |
| metadata.sliceFileMap.size())); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(baseDir); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o instanceof AppendTrieDictionary) { |
| AppendTrieDictionary that = (AppendTrieDictionary) o; |
| return Objects.equals(this.baseDir, that.baseDir); |
| } |
| return false; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format(Locale.ROOT, "AppendTrieDictionary(%s)", baseDir); |
| } |
| |
| @Override |
| public boolean contains(Dictionary other) { |
| return false; |
| } |
| |
| /** |
| * JIRA: https://issues.apache.org/jira/browse/KYLIN-2945 |
| * if pass a absolute path, it may produce some problems like cannot find global dict after migration. |
| * so convert to relative path can avoid it and be better to maintain flexibility. |
| * |
| */ |
| private String convertToRelativePath(String path) { |
| KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); |
| String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory(); |
| if (!isSaveAbsolutePath && path.startsWith(hdfsWorkingDir)) { |
| return path.substring(hdfsWorkingDir.length()); |
| } |
| return path; |
| } |
| |
| private String convertToAbsolutePath(String path) { |
| KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); |
| Path basicPath = new Path(path); |
| if (basicPath.toUri().getScheme() == null) |
| return kylinConfig.getHdfsWorkingDirectory() + path; |
| |
| String[] paths = path.split("/resources/GlobalDict/"); |
| if (paths.length == 2) |
| return kylinConfig.getHdfsWorkingDirectory() + "/resources/GlobalDict/" + paths[1]; |
| |
| paths = path.split("/resources/SegmentDict/"); |
| if (paths.length == 2) { |
| return kylinConfig.getHdfsWorkingDirectory() + "/resources/SegmentDict/" + paths[1]; |
| } else { |
| throw new RuntimeException( |
| "the basic directory of global dictionary only support the format which contains '/resources/GlobalDict/' or '/resources/SegmentDict/'"); |
| } |
| } |
| |
| /** |
| * only for test |
| * |
| * @param flag |
| */ |
| void setSaveAbsolutePath(Boolean flag) { |
| this.isSaveAbsolutePath = flag; |
| } |
| } |