blob: a4cc4d0c7ba6c8d87e8ed76ab1451e73c63c44a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.store.writer.v3;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonVersionConstants;
import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage;
import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataFileFooterConverterV3;
import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.FileFooter3;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
import org.apache.carbondata.processing.store.TablePage;
import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.TABLE_BLOCKLET_SIZE;
import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB;
import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE;
import static org.apache.carbondata.core.constants.SortScopeOptions.SortScope.NO_SORT;
import org.apache.log4j.Logger;
/**
* Below class will be used to write the data in V3 format
* <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
* <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
* <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
* <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
*/
public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonFactDataWriterImplV3.class.getName());
/**
* persist the page data to be written in the file
*/
private BlockletDataHolder blockletDataHolder;
/**
* Threshold of blocklet size in MB
*/
private long blockletSizeThreshold;
/**
* True if this file is sorted
*/
private boolean isSorted;
public CarbonFactDataWriterImplV3(CarbonFactDataHandlerModel model) {
super(model);
String blockletSize =
model.getTableSpec().getCarbonTable().getTableInfo().getFactTable().getTableProperties()
.get(TABLE_BLOCKLET_SIZE);
if (blockletSize == null) {
blockletSize = CarbonProperties.getInstance().getProperty(
BLOCKLET_SIZE_IN_MB, BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE);
}
blockletSizeThreshold = Long.parseLong(blockletSize) << 20;
if (blockletSizeThreshold > fileSizeInBytes) {
blockletSizeThreshold = fileSizeInBytes;
LOGGER.info("Blocklet size configure for table is: " + blockletSizeThreshold);
}
blockletDataHolder = new BlockletDataHolder(fallbackExecutorService, model);
if (model.getSortScope() != null) {
isSorted = model.getSortScope() != NO_SORT;
}
LOGGER.info("Sort Scope : " + model.getSortScope());
}
@Override
protected void writeFooterToFile() throws CarbonDataWriterException {
try {
// get the current file position
long footerOffset = currentOffsetInFile;
// get thrift file footer instance
FileFooter3 convertFileMeta = CarbonMetadataUtil
.convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
thriftColumnSchemaList.size());
convertFileMeta.setIs_sort(isSorted);
String appName = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
if (appName == null) {
throw new CarbonDataWriterException(
"DataLoading failed as CARBON_WRITTEN_BY_APPNAME is null");
}
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO, appName);
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_VERSION,
CarbonVersionConstants.CARBONDATA_VERSION);
// write the footer
byte[] byteArray = CarbonUtil.getByteArray(convertFileMeta);
ByteBuffer buffer =
ByteBuffer.allocate(byteArray.length + CarbonCommonConstants.LONG_SIZE_IN_BYTE);
buffer.put(byteArray);
buffer.putLong(footerOffset);
buffer.flip();
currentOffsetInFile += fileChannel.write(buffer);
// fill the carbon index details
fillBlockIndexInfoDetails(
convertFileMeta.getNum_rows(), carbonDataFileName, footerOffset, currentOffsetInFile);
} catch (IOException e) {
LOGGER.error("Problem while writing the carbon file", e);
throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
}
}
/**
* Below method will be used to write one table page data, invoked by Consumer
* @param tablePage
*/
@Override
public void writeTablePage(TablePage tablePage)
throws CarbonDataWriterException, IOException {
// condition for writting all the pages
if (!tablePage.isLastPage()) {
boolean isAdded = false;
// check if size more than blocklet size then write the page to file
if (blockletDataHolder.getSize() + tablePage.getEncodedTablePage().getEncodedSize()
>= blockletSizeThreshold) {
// if blocklet size exceeds threshold, write blocklet data
if (blockletDataHolder.getNumberOfPagesAdded() == 0) {
isAdded = true;
addPageData(tablePage);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Number of Pages for blocklet is: " +
blockletDataHolder.getNumberOfPagesAdded() +
" :Rows Added: " + blockletDataHolder.getTotalRows());
}
// write the data
writeBlockletToFile();
}
if (!isAdded) {
addPageData(tablePage);
}
} else {
//for last blocklet check if the last page will exceed the blocklet size then write
// existing pages and then last page
if (tablePage.getPageSize() > 0) {
addPageData(tablePage);
}
if (blockletDataHolder.getNumberOfPagesAdded() > 0) {
LOGGER.info("Number of Pages for blocklet is: " + blockletDataHolder.getNumberOfPagesAdded()
+ " :Rows Added: " + blockletDataHolder.getTotalRows());
writeBlockletToFile();
}
}
}
private void addPageData(TablePage tablePage) throws IOException {
blockletDataHolder.addPage(tablePage);
if (listener != null &&
model.getDatabaseName().equalsIgnoreCase(listener.getTblIdentifier().getDatabaseName()) &&
model.getTableName().equalsIgnoreCase(listener.getTblIdentifier().getTableName())) {
if (pageId == 0) {
listener.onBlockletStart(blockletId);
}
listener.onPageAdded(blockletId, pageId++, tablePage);
}
}
/**
* Write the collect blocklet data (blockletDataHolder) to file
*/
private void writeBlockletToFile() {
// get the list of all encoded table page
EncodedBlocklet encodedBlocklet = blockletDataHolder.getEncodedBlocklet();
int numDimensions = encodedBlocklet.getNumberOfDimension();
int numMeasures = encodedBlocklet.getNumberOfMeasure();
// get data chunks for all the column
byte[][] dataChunkBytes = new byte[numDimensions + numMeasures][];
long metadataSize = fillDataChunk(encodedBlocklet, dataChunkBytes);
// calculate the total size of data to be written
long blockletSize = blockletDataHolder.getSize() + metadataSize;
// to check if data size will exceed the block size then create a new file
createNewFileIfReachThreshold(blockletSize);
// write data to file
try {
if (currentOffsetInFile == 0) {
// write the header if file is empty
writeHeaderToFile();
}
writeBlockletToFile(dataChunkBytes);
if (listener != null &&
model.getDatabaseName().equalsIgnoreCase(listener.getTblIdentifier().getDatabaseName()) &&
model.getTableName().equalsIgnoreCase(listener.getTblIdentifier().getTableName())) {
listener.onBlockletEnd(blockletId++);
}
pageId = 0;
} catch (IOException e) {
LOGGER.error("Problem while writing file", e);
throw new CarbonDataWriterException("Problem while writing file", e);
} finally {
// clear the data holder
blockletDataHolder.clear();
}
}
/**
* Fill dataChunkBytes and return total size of page metadata
*/
private long fillDataChunk(EncodedBlocklet encodedBlocklet, byte[][] dataChunkBytes) {
int size = 0;
int numDimensions = encodedBlocklet.getNumberOfDimension();
int numMeasures = encodedBlocklet.getNumberOfMeasure();
int measureStartIndex = numDimensions;
// calculate the size of data chunks
for (int i = 0; i < numDimensions; i++) {
dataChunkBytes[i] =
CarbonUtil.getByteArray(CarbonMetadataUtil.getDimensionDataChunk3(encodedBlocklet, i));
size += dataChunkBytes[i].length;
}
for (int i = 0; i < numMeasures; i++) {
dataChunkBytes[measureStartIndex] =
CarbonUtil.getByteArray(CarbonMetadataUtil.getMeasureDataChunk3(encodedBlocklet, i));
size += dataChunkBytes[measureStartIndex].length;
measureStartIndex++;
}
return size;
}
/**
* write file header
*/
private void writeHeaderToFile() throws IOException {
byte[] fileHeader = CarbonUtil.getByteArray(
CarbonMetadataUtil.getFileHeader(
true, thriftColumnSchemaList, model.getSchemaUpdatedTimeStamp()));
ByteBuffer buffer = ByteBuffer.wrap(fileHeader);
currentOffsetInFile += fileChannel.write(buffer);
}
/**
* Write one blocklet data into file
* File format:
* <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
* <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
* <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
* <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
*/
private void writeBlockletToFile(byte[][] dataChunkBytes)
throws IOException {
long offset = currentOffsetInFile;
// to maintain the offset of each data chunk in blocklet
List<Long> currentDataChunksOffset = new ArrayList<>();
// to maintain the length of each data chunk in blocklet
List<Integer> currentDataChunksLength = new ArrayList<>();
EncodedBlocklet encodedBlocklet = blockletDataHolder.getEncodedBlocklet();
int numberOfDimension = encodedBlocklet.getNumberOfDimension();
int numberOfMeasures = encodedBlocklet.getNumberOfMeasure();
ByteBuffer buffer = null;
long dimensionOffset = 0;
long measureOffset = 0;
for (int i = 0; i < numberOfDimension; i++) {
currentDataChunksOffset.add(offset);
currentDataChunksLength.add(dataChunkBytes[i].length);
buffer = ByteBuffer.wrap(dataChunkBytes[i]);
currentOffsetInFile += fileChannel.write(buffer);
offset += dataChunkBytes[i].length;
BlockletEncodedColumnPage blockletEncodedColumnPage =
encodedBlocklet.getEncodedDimensionColumnPages().get(i);
for (EncodedColumnPage dimensionPage : blockletEncodedColumnPage
.getEncodedColumnPageList()) {
buffer = dimensionPage.getEncodedData();
int bufferSize = buffer.limit();
currentOffsetInFile += fileChannel.write(buffer);
offset += bufferSize;
}
}
dimensionOffset = offset;
int dataChunkStartIndex = encodedBlocklet.getNumberOfDimension();
for (int i = 0; i < numberOfMeasures; i++) {
currentDataChunksOffset.add(offset);
currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length);
buffer = ByteBuffer.wrap(dataChunkBytes[dataChunkStartIndex]);
currentOffsetInFile += fileChannel.write(buffer);
offset += dataChunkBytes[dataChunkStartIndex].length;
dataChunkStartIndex++;
BlockletEncodedColumnPage blockletEncodedColumnPage =
encodedBlocklet.getEncodedMeasureColumnPages().get(i);
for (EncodedColumnPage measurePage : blockletEncodedColumnPage
.getEncodedColumnPageList()) {
buffer = measurePage.getEncodedData();
int bufferSize = buffer.limit();
currentOffsetInFile += fileChannel.write(buffer);
offset += bufferSize;
}
}
measureOffset = offset;
blockletIndex.add(
CarbonMetadataUtil.getBlockletIndex(
encodedBlocklet, model.getSegmentProperties().getMeasures()));
BlockletInfo3 blockletInfo3 =
new BlockletInfo3(encodedBlocklet.getBlockletSize(), currentDataChunksOffset,
currentDataChunksLength, dimensionOffset, measureOffset,
encodedBlocklet.getNumberOfPages());
// Avoid storing as integer in encodedBocklet,
// but in thrift store as int for large number of rows future support
List<Integer> rowList = new ArrayList<>(encodedBlocklet.getRowCountInPage().size());
for (int rows : encodedBlocklet.getRowCountInPage()) {
rowList.add(rows);
}
blockletInfo3.setRow_count_in_page(rowList);
blockletMetadata.add(blockletInfo3);
}
/**
* Below method will be used to fill the block info details
*
* @param numberOfRows number of rows in file
* @param carbonDataFileName The name of carbonData file
* @param footerOffset footer offset
* @param fileSize file size
*/
@Override
protected void fillBlockIndexInfoDetails(long numberOfRows, String carbonDataFileName,
long footerOffset, long fileSize) {
int i = 0;
DataFileFooterConverterV3 converterV3 = new DataFileFooterConverterV3();
for (org.apache.carbondata.format.BlockletIndex index : blockletIndex) {
BlockletInfo3 blockletInfo3 = blockletMetadata.get(i);
BlockletInfo blockletInfo = converterV3.getBlockletInfo(blockletInfo3,
model.getSegmentProperties().getDimensions().size());
BlockletBTreeIndex bTreeIndex = new BlockletBTreeIndex(index.b_tree_index.getStart_key(),
index.b_tree_index.getEnd_key());
BlockletMinMaxIndex minMaxIndex =
new BlockletMinMaxIndex(index.getMin_max_index().getMin_values(),
index.getMin_max_index().getMax_values(),
index.getMin_max_index().getMin_max_presence());
org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex bIndex =
new org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex(bTreeIndex,
minMaxIndex);
BlockIndexInfo biInfo =
new BlockIndexInfo(numberOfRows, carbonDataFileName, footerOffset, bIndex,
blockletInfo, fileSize);
blockIndexInfoList.add(biInfo);
i++;
}
}
private byte[][] toByteArray(List<ByteBuffer> buffers) {
byte[][] arrays = new byte[buffers.size()][];
for (int i = 0; i < arrays.length; i++) {
arrays[i] = buffers.get(i).array();
}
return arrays;
}
/**
* Method will be used to close the open file channel
*
* @throws CarbonDataWriterException
*/
public void closeWriter() throws CarbonDataWriterException {
CarbonDataWriterException exception = null;
try {
commitCurrentFile(true);
writeIndexFile();
} catch (Exception e) {
LOGGER.error("Problem while writing the index file", e);
exception = new CarbonDataWriterException("Problem while writing the index file", e);
} finally {
try {
closeExecutorService();
} catch (CarbonDataWriterException e) {
if (null == exception) {
exception = e;
}
}
}
if (null != exception) {
throw exception;
}
}
@Override
public void writeFooter() throws CarbonDataWriterException {
if (this.blockletMetadata.size() > 0) {
writeFooterToFile();
}
}
}