blob: aece1efb13ef951d35a0f047fb4f1af4744243e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.readsupport.impl;
import java.io.IOException;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
/**
* This is the class to decode dictionary encoded column data back to its original value.
*/
public class DictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
protected Dictionary[] dictionaries;
protected DataType[] dataTypes;
/**
* This initialization is done inside executor task
* for column dictionary involved in decoding.
*
* @param carbonColumns column list
* @param carbonTable table identifier
*/
@Override
public void initialize(CarbonColumn[] carbonColumns,
CarbonTable carbonTable) throws IOException {
dictionaries = new Dictionary[carbonColumns.length];
dataTypes = new DataType[carbonColumns.length];
for (int i = 0; i < carbonColumns.length; i++) {
if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i]
.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) {
CacheProvider cacheProvider = CacheProvider.getInstance();
Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
.createCache(CacheType.FORWARD_DICTIONARY);
dataTypes[i] = carbonColumns[i].getDataType();
dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier(
carbonTable.getAbsoluteTableIdentifier(),
carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
} else {
dataTypes[i] = carbonColumns[i].getDataType();
}
}
}
@Override
public T readRow(Object[] data) {
assert (data.length == dictionaries.length);
for (int i = 0; i < dictionaries.length; i++) {
if (dictionaries[i] != null) {
data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
}
}
return (T)data;
}
/**
* to book keep the dictionary cache or update access count for each
* column involved during decode, to facilitate LRU cache policy if memory
* threshold is reached
*/
@Override
public void close() {
if (dictionaries == null) {
return;
}
for (int i = 0; i < dictionaries.length; i++) {
CarbonUtil.clearDictionaryCache(dictionaries[i]);
}
}
}