blob: f98bb910fd7eb7ff0780a915bd75008ec3a5ee1a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.index.bloom;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.index.dev.IndexWriter;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.hadoop.util.bloom.CarbonBloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
@InterfaceAudience.Internal
public abstract class AbstractBloomIndexWriter extends IndexWriter {
private int bloomFilterSize;
private double bloomFilterFpp;
private boolean compressBloom;
protected int currentBlockletId;
private List<DataOutputStream> currentDataOutStreams;
protected List<CarbonBloomFilter> indexBloomFilters;
AbstractBloomIndexWriter(String tablePath, String indexName, List<CarbonColumn> indexColumns,
Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp,
boolean compressBloom)
throws IOException {
super(tablePath, indexName, indexColumns, segment, shardName);
this.bloomFilterSize = bloomFilterSize;
this.bloomFilterFpp = bloomFilterFpp;
this.compressBloom = compressBloom;
currentDataOutStreams = new ArrayList<>(indexColumns.size());
indexBloomFilters = new ArrayList<>(indexColumns.size());
initIndexFile();
resetBloomFilters();
}
@Override
public void onBlockStart(String blockId) {
}
@Override
public void onBlockEnd(String blockId) {
}
@Override
public void onBlockletStart(int blockletId) {
}
protected void resetBloomFilters() {
indexBloomFilters.clear();
int[] stats = calculateBloomStats();
for (int i = 0; i < indexColumns.size(); i++) {
indexBloomFilters
.add(new CarbonBloomFilter(stats[0], stats[1], Hash.MURMUR_HASH, compressBloom));
}
}
/**
* It calculates the bits size and number of hash functions to calculate bloom.
*/
private int[] calculateBloomStats() {
/*
* n: how many items you expect to have in your filter
* p: your acceptable false positive rate
* Number of bits (m) = -n*ln(p) / (ln(2)^2)
* Number of hashes(k) = m/n * ln(2)
*/
double sizeinBits = -bloomFilterSize * Math.log(bloomFilterFpp) / (Math.pow(Math.log(2), 2));
double numberOfHashes = sizeinBits / bloomFilterSize * Math.log(2);
int[] stats = new int[2];
stats[0] = (int) Math.ceil(sizeinBits);
stats[1] = (int) Math.ceil(numberOfHashes);
return stats;
}
@Override
public void onBlockletEnd(int blockletId) {
writeBloomIndexFile();
currentBlockletId++;
}
@Override
public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
for (int rowId = 0; rowId < pageSize; rowId++) {
// for each indexed column, add the data to index
for (int i = 0; i < indexColumns.size(); i++) {
Object data = pages[i].getData(rowId);
addValue2BloomIndex(i, data);
}
}
}
protected void addValue2BloomIndex(int indexColIdx, Object value) {
byte[] indexValue;
// convert measure to bytes
// convert non-dict dimensions to simple bytes without length
// convert internal-dict dimensions to simple bytes without any encode
if (indexColumns.get(indexColIdx).isMeasure()) {
// NULL value of all measures are already processed in `ColumnPage.getData`
// or `RawBytesReadSupport.readRow` with actual data type
// Carbon stores boolean as byte. Here we convert it for `getValueAsBytes`
if (indexColumns.get(indexColIdx).getDataType().equals(DataTypes.BOOLEAN)) {
value = BooleanConvert.boolean2Byte((Boolean)value);
}
indexValue = CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value);
} else {
if (indexColumns.get(indexColIdx).getDataType() == DataTypes.DATE) {
indexValue = convertDictionaryValue(indexColIdx, value);
} else {
indexValue = convertNonDictionaryValue(indexColIdx, value);
}
}
if (indexValue.length == 0) {
indexValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
}
indexBloomFilters.get(indexColIdx).add(new Key(indexValue));
}
protected abstract byte[] convertDictionaryValue(int indexColIdx, Object value);
protected abstract byte[] convertNonDictionaryValue(int indexColIdx, Object value);
private void initIndexFile() throws IOException {
if (!FileFactory.isFileExist(indexPath)) {
FileFactory.touchDirectory(FileFactory.getCarbonFile(indexPath));
if (!FileFactory.isFileExist(indexPath)) {
throw new IOException("Failed to create directory " + indexPath);
}
}
for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
String dmFile = BloomIndexFileStore.getBloomIndexFile(indexPath,
indexColumns.get(indexColId).getColName());
DataOutputStream dataOutStream = null;
try {
FileFactory.createNewFile(dmFile);
dataOutStream = FileFactory.getDataOutputStream(dmFile);
} catch (IOException e) {
throw new IOException(e);
}
this.currentDataOutStreams.add(dataOutStream);
}
}
protected void writeBloomIndexFile() {
try {
for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
CarbonBloomFilter bloomFilter = indexBloomFilters.get(indexColId);
bloomFilter.setBlockletNo(currentBlockletId);
// only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface.
// In lower version, we use default java serializer to write bloomfilter.
bloomFilter.write(this.currentDataOutStreams.get(indexColId));
this.currentDataOutStreams.get(indexColId).flush();
}
} catch (Exception e) {
for (DataOutputStream dataOutputStream : currentDataOutStreams) {
CarbonUtil.closeStreams(dataOutputStream);
}
throw new RuntimeException(e);
} finally {
resetBloomFilters();
}
}
@Override
public void finish() {
if (!isWritingFinished()) {
releaseResouce();
setWritingFinished(true);
}
}
protected void releaseResouce() {
for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
CarbonUtil.closeStreams(
currentDataOutStreams.get(indexColId));
}
}
}