blob: 72c201e1b4fa2cb14784e5161e6d135fdd6073ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.page.encoding.adaptive;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
import org.apache.carbondata.core.datastore.columnar.ObjectArrayBlockIndexerStorage;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.SortState;
/**
* Subclass of this codec depends on statistics of the column page (adaptive) to perform apply
* and decode, it also employs compressor to compress the encoded data
*/
public abstract class AdaptiveCodec implements ColumnPageCodec {
// TODO: cache and reuse the same encoder since snappy is thread-safe
// statistics of this page, can be used by subclass
protected final SimpleStatsResult stats;
// the data type used for storage
protected final DataType targetDataType;
// the data type specified in schema
protected final DataType srcDataType;
protected boolean isInvertedIndex;
protected BlockIndexerStorage<Object[]> indexStorage;
protected ColumnPage encodedPage;
protected AdaptiveCodec(DataType srcDataType, DataType targetDataType,
SimpleStatsResult stats, boolean isInvertedIndex) {
this.stats = stats;
this.srcDataType = srcDataType;
this.targetDataType = targetDataType;
this.isInvertedIndex = isInvertedIndex;
}
public DataType getTargetDataType() {
return targetDataType;
}
/**
* Convert the data of the page based on the data type for each row
* While preparing the inverted index for the page,
* we need the data based on data type for no dict measure column if adaptive encoding is applied
* This is similar to page.getByteArrayPage()
*
* @param input
* @return
*/
public Object[] getPageBasedOnDataType(ColumnPage input) {
Object[] data = new Object[input.getActualRowCount()];
if (srcDataType == DataTypes.BYTE || srcDataType == DataTypes.BOOLEAN) {
for (int i = 0; i < input.getActualRowCount(); i++) {
data[i] = input.getByte(i);
}
} else if (srcDataType == DataTypes.SHORT) {
for (int i = 0; i < input.getActualRowCount(); i++) {
data[i] = input.getShort(i);
}
} else if (srcDataType == DataTypes.SHORT_INT) {
for (int i = 0; i < input.getActualRowCount(); i++) {
data[i] = input.getShortInt(i);
}
} else if (srcDataType == DataTypes.INT) {
for (int i = 0; i < input.getActualRowCount(); i++) {
data[i] = input.getInt(i);
}
} else if (srcDataType == DataTypes.LONG) {
for (int i = 0; i < input.getActualRowCount(); i++) {
data[i] = input.getLong(i);
}
} else if (srcDataType == DataTypes.FLOAT) {
for (int i = 0; i < input.getActualRowCount(); i++) {
data[i] = input.getFloat(i);
}
} else if (srcDataType == DataTypes.DOUBLE) {
for (int i = 0; i < input.getActualRowCount(); i++) {
data[i] = input.getDouble(i);
}
}
return data;
}
/**
* Put the data to the page based on the data type for each row
*
* @param page
* @return
*/
public void putDataToPage(ColumnPage page, Object[] dataPage) {
if (srcDataType == DataTypes.BYTE || srcDataType == DataTypes.BOOLEAN) {
for (int i = 0; i < dataPage.length; i++) {
page.putByte(i, (byte) dataPage[i]);
}
} else if (srcDataType == DataTypes.SHORT) {
for (int i = 0; i < dataPage.length; i++) {
page.putShort(i, (short) dataPage[i]);
}
} else if (srcDataType == DataTypes.SHORT_INT) {
for (int i = 0; i < dataPage.length; i++) {
page.putShortInt(i, (int) dataPage[i]);
}
} else if (srcDataType == DataTypes.INT) {
for (int i = 0; i < dataPage.length; i++) {
page.putInt(i, (int) dataPage[i]);
}
} else if (srcDataType == DataTypes.LONG) {
for (int i = 0; i < dataPage.length; i++) {
page.putLong(i, (long) dataPage[i]);
}
} else if (srcDataType == DataTypes.DOUBLE) {
for (int i = 0; i < dataPage.length; i++) {
page.putDouble(i, (double) dataPage[i]);
}
} else if (srcDataType == DataTypes.FLOAT) {
for (int i = 0; i < dataPage.length; i++) {
page.putFloat(i, (float) dataPage[i]);
}
}
}
/**
* Write the inverted index to the page if required
*
* @param result
* @throws IOException
*/
public ByteBuffer writeInvertedIndexIfRequired(ByteBuffer result) throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(stream);
if (null != indexStorage) {
if (result.isDirect()) {
// cannot apply result.array() on direct buffers
for (int i = result.position(); i < result.limit(); i++) {
out.writeByte(result.get(i));
}
} else {
out.write(result.array(), result.position(), result.limit());
}
if (indexStorage.getRowIdPageLengthInBytes() > 0) {
out.writeInt(indexStorage.getRowIdPageLengthInBytes());
short[] rowIdPage = (short[]) indexStorage.getRowIdPage();
for (short rowId : rowIdPage) {
out.writeShort(rowId);
}
if (indexStorage.getRowIdRlePageLengthInBytes() > 0) {
short[] rowIdRlePage = (short[]) indexStorage.getRowIdRlePage();
for (short rowIdRle : rowIdRlePage) {
out.writeShort(rowIdRle);
}
}
}
}
byte[] bytes = stream.toByteArray();
stream.close();
return ByteBuffer.wrap(bytes);
}
/**
* Fill legacy fields if required
*
* @param dataChunk
* @param result
*/
public void fillLegacyFieldsIfRequired(DataChunk2 dataChunk, ByteBuffer result) {
if (null != indexStorage) {
SortState sort = (indexStorage.getRowIdPageLengthInBytes() > 0) ?
SortState.SORT_EXPLICIT :
SortState.SORT_NATIVE;
dataChunk.setSort_state(sort);
if (indexStorage.getRowIdPageLengthInBytes() > 0) {
int rowIdPageLength =
CarbonCommonConstants.INT_SIZE_IN_BYTE + indexStorage.getRowIdPageLengthInBytes()
+ indexStorage.getRowIdRlePageLengthInBytes();
dataChunk.setRowid_page_length(rowIdPageLength);
}
} else {
dataChunk.setRowid_page_length(0);
}
if (null != result) {
dataChunk.setData_page_length(result.limit() - result.position());
}
}
/**
* Get the new column page based on the sorted data
*
* @param input
* @return
*/
public ColumnPage getSortedColumnPageIfRequired(ColumnPage input) {
if (null != indexStorage) {
Object[] dataPage = indexStorage.getDataPage();
ColumnPageEncoderMeta columnPageEncoderMeta =
new ColumnPageEncoderMeta(input.getColumnSpec(), input.getDataType(),
input.getColumnPageEncoderMeta().getCompressorName());
ColumnPage columnPage = ColumnPage.newPage(columnPageEncoderMeta, input.getPageSize());
putDataToPage(columnPage, dataPage);
return columnPage;
} else {
return input;
}
}
public ByteBuffer encodeAndCompressPage(ColumnPage input, ColumnPageValueConverter converter,
Compressor compressor) throws IOException {
encodedPage = ColumnPage.newPage(
new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(), targetDataType,
input.getColumnPageEncoderMeta().getCompressorName()), input.getPageSize());
if (isInvertedIndex) {
indexStorage =
new ObjectArrayBlockIndexerStorage(getPageBasedOnDataType(input), input.getDataType(),
isInvertedIndex);
}
ColumnPage columnPage = getSortedColumnPageIfRequired(input);
columnPage.convertValue(converter);
return encodedPage.compress(compressor);
}
@Override
public String toString() {
return String.format("%s[src type: %s, target type: %s, stats(%s)]",
getClass().getName(), srcDataType, targetDataType, stats);
}
protected String debugInfo() {
return this.toString();
}
}