blob: 7221b3d5bbdad985e0bfcbf632ab4be2bb120744 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.store.writer;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.OutputFilesInfoHolder;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
import static org.apache.carbondata.core.constants.SortScopeOptions.SortScope.NO_SORT;
import org.apache.log4j.Logger;
public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
private static final Logger LOGGER =
LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());
/**
* file channel to write
*/
protected WritableByteChannel fileChannel;
protected long currentOffsetInFile;
/**
* The path of CarbonData file to write in hdfs/s3
*/
private String carbonDataFileStorePath;
/**
* The temp path of carbonData file used on executor
*/
private String carbonDataFileTempPath;
/**
* The name of carbonData file (blockId)
*/
protected String carbonDataFileName;
/**
* The sequence number of blocklet inside one block
*/
protected int blockletId = 0;
/**
* The sequence number of page inside one blocklet
*/
protected int pageId = 0;
/**
* Local cardinality for the segment
*/
protected int[] localCardinality;
/**
* thrift column schema
*/
protected List<org.apache.carbondata.format.ColumnSchema> thriftColumnSchemaList;
protected CarbonFactDataHandlerModel model;
/**
* data file size;
*/
protected long fileSizeInBytes;
/**
* file count will be used to give sequence number to the data file
*/
private int fileCount;
/**
* executorService
*/
private ExecutorService executorService;
/**
* executorService
*/
private List<Future<Void>> executorServiceSubmitList;
/**
* data block size for one carbon data file
*/
private long blockSizeThreshold;
/**
* file size at any given point
*/
private long currentFileSize;
protected DataOutputStream fileOutputStream;
protected List<BlockIndexInfo> blockIndexInfoList;
/**
* list of metadata for V3 format
*/
protected List<BlockletInfo3> blockletMetadata;
/**
* list of blocklet index
*/
protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex;
/**
* listener to write data map
*/
protected DataMapWriterListener listener;
/**
* Whether directly write fact data to store path
*/
private boolean enableDirectlyWriteDataToStorePath = false;
protected ExecutorService fallbackExecutorService;
private OutputFilesInfoHolder outputFilesInfoHolder;
public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {
this.model = model;
blockIndexInfoList = new ArrayList<>();
// get max file size;
CarbonProperties propInstance = CarbonProperties.getInstance();
// if blocksize=2048, then 2048*1024*1024 will beyond the range of Int
this.fileSizeInBytes =
(long) this.model.getBlockSizeInMB() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
* CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
// size reserved in one file for writing block meta data. It will be in percentage
int spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
.getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE,
CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
this.blockSizeThreshold =
fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
LOGGER
.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + blockSizeThreshold);
// whether to directly write fact data to HDFS
String directlyWriteData2Hdfs = propInstance
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
this.enableDirectlyWriteDataToStorePath = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
if (enableDirectlyWriteDataToStorePath) {
LOGGER.info("Carbondata will directly write fact data to store path.");
} else {
LOGGER.info("Carbondata will write temporary fact data to local disk.");
}
this.executorService = Executors.newFixedThreadPool(1,
new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName(),
true));
executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// in case of compaction we will pass the cardinality.
this.localCardinality = this.model.getColCardinality();
List<Integer> cardinalityList = new ArrayList<Integer>();
thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
this.model.getWrapperColumnSchema());
blockletMetadata = new ArrayList<BlockletInfo3>();
blockletIndex = new ArrayList<>();
listener = this.model.getDataMapWriterlistener();
if (model.getColumnLocalDictGenMap().size() > 0) {
int numberOfCores = 1;
if (model.getNumberOfCores() > 1) {
numberOfCores = model.getNumberOfCores() / 2;
}
fallbackExecutorService = Executors.newFixedThreadPool(numberOfCores, new CarbonThreadFactory(
"FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId(),
true));
}
this.outputFilesInfoHolder = this.model.getOutputFilesInfoHolder();
}
/**
* This method will be used to update the file channel with new file if exceeding block size
* threshold, new file will be created once existing file reached the file size limit This
* method will first check whether existing file size is exceeded the file
* size limit if yes then write the leaf metadata to file then set the
* current file size to 0 close the existing file channel get the new file
* name and get the channel for new file
*
* @param blockletSizeToBeAdded data size of one block
* @throws CarbonDataWriterException if any problem
*/
protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded)
throws CarbonDataWriterException {
if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
// set the current file size to zero
String activeFile =
enableDirectlyWriteDataToStorePath ? carbonDataFileStorePath : carbonDataFileTempPath;
LOGGER.info("Writing data to file as max file size reached for file: "
+ activeFile + ". Data block size: " + currentFileSize);
// write meta data to end of the existing file
writeFooterToFile();
this.blockletMetadata = new ArrayList<>();
this.blockletIndex = new ArrayList<>();
commitCurrentFile(false);
// initialize the new channel
this.currentFileSize = 0;
initializeWriter();
}
currentFileSize += blockletSizeToBeAdded;
}
private void notifyDataMapBlockStart() {
if (listener != null) {
try {
listener.onBlockStart(carbonDataFileName);
} catch (IOException e) {
throw new CarbonDataWriterException("Problem while writing datamap", e);
}
}
}
private void notifyDataMapBlockEnd() {
if (listener != null) {
try {
listener.onBlockEnd(carbonDataFileName);
} catch (IOException e) {
throw new CarbonDataWriterException("Problem while writing datamap", e);
}
}
}
/**
* Finish writing current file. It will flush stream, copy and rename temp file to final file
* @param copyInCurrentThread set to false if want to do data copy in a new thread
*/
protected void commitCurrentFile(boolean copyInCurrentThread) {
notifyDataMapBlockEnd();
CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
if (!enableDirectlyWriteDataToStorePath) {
try {
if (currentFileSize == 0) {
handleEmptyDataFile(carbonDataFileTempPath);
} else {
if (copyInCurrentThread) {
CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
model.getCarbonDataDirectoryPath(), fileSizeInBytes, outputFilesInfoHolder);
FileFactory.deleteFile(carbonDataFileTempPath);
} else {
executorServiceSubmitList
.add(executorService.submit(new CompleteHdfsBackendThread(carbonDataFileTempPath)));
}
}
} catch (IOException e) {
LOGGER.error(e);
}
} else {
if (currentFileSize == 0) {
try {
handleEmptyDataFile(carbonDataFileStorePath);
} catch (IOException e) {
LOGGER.error(e);
}
}
}
}
private void handleEmptyDataFile(String filePath) throws IOException {
FileFactory.deleteFile(filePath);
if (blockIndexInfoList.size() > 0 && blockIndexInfoList.get(blockIndexInfoList.size() - 1)
.getFileName().equals(carbonDataFileName)) {
// no need add this entry in index file
blockIndexInfoList.remove(blockIndexInfoList.size() - 1);
// TODO: currently there is no implementation for notifyDataMapBlockEnd(),
// hence no impact, once implementation is done. Need to undo it in this case.
}
}
/**
* This method will be used to initialize the channel
*
* @throws CarbonDataWriterException
*/
public void initializeWriter() throws CarbonDataWriterException {
this.carbonDataFileName = CarbonTablePath
.getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
"" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
this.carbonDataFileStorePath = model.getCarbonDataDirectoryPath() + File.separator
+ carbonDataFileName;
try {
if (enableDirectlyWriteDataToStorePath) {
// the block size will be twice the block_size specified by user to make sure that
// one carbondata file only consists exactly one HDFS block.
fileOutputStream = FileFactory
.getDataOutputStream(carbonDataFileStorePath, CarbonCommonConstants.BYTEBUFFER_SIZE,
fileSizeInBytes * 2);
} else {
//each time we initialize writer, we choose a local temp location randomly
String[] tempFileLocations = model.getStoreLocation();
String chosenTempLocation =
tempFileLocations[new Random().nextInt(tempFileLocations.length)];
LOGGER.info("Randomly choose factdata temp location: " + chosenTempLocation);
carbonDataFileTempPath = chosenTempLocation + File.separator + carbonDataFileName;
fileOutputStream = FileFactory
.getDataOutputStream(carbonDataFileTempPath, CarbonCommonConstants.BYTEBUFFER_SIZE,
true);
}
this.fileCount++;
// open channel for new data file
this.fileChannel = Channels.newChannel(fileOutputStream);
this.currentOffsetInFile = 0;
} catch (IOException ex) {
throw new CarbonDataWriterException(
"Problem while getting the channel for fact data file", ex);
}
notifyDataMapBlockStart();
}
/**
* This method will write metadata at the end of file file format in thrift format
*/
protected abstract void writeFooterToFile() throws CarbonDataWriterException;
/**
* Below method will be used to fill the vlock info details
*
* @param numberOfRows number of rows in file
* @param carbonDataFileName The name of carbonData file
* @param footerOffset footer offset
* @param fileSize
*/
protected abstract void fillBlockIndexInfoDetails(long numberOfRows, String carbonDataFileName,
long footerOffset, long fileSize);
public static List<org.apache.carbondata.format.ColumnSchema> getColumnSchemaListAndCardinality(
List<Integer> cardinality, int[] dictionaryColumnCardinality,
List<ColumnSchema> wrapperColumnSchemaList) {
List<org.apache.carbondata.format.ColumnSchema> columnSchemaList =
new ArrayList<org.apache.carbondata.format.ColumnSchema>(
CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
int counter = 0;
for (int i = 0; i < wrapperColumnSchemaList.size(); i++) {
columnSchemaList
.add(schemaConverter.fromWrapperToExternalColumnSchema(wrapperColumnSchemaList.get(i)));
if (CarbonUtil.hasEncoding(wrapperColumnSchemaList.get(i).getEncodingList(),
org.apache.carbondata.core.metadata.encoder.Encoding.DICTIONARY)) {
cardinality.add(dictionaryColumnCardinality[counter]);
counter++;
} else if (!wrapperColumnSchemaList.get(i).isDimensionColumn()) {
continue;
} else {
cardinality.add(-1);
}
}
return columnSchemaList;
}
/**
* Below method will be used to write the idex file
*
* @throws IOException throws io exception if any problem while writing
* @throws CarbonDataWriterException data writing
*/
protected void writeIndexFile() throws IOException, CarbonDataWriterException {
if (blockIndexInfoList.size() == 0) {
// no need to write index file, if data file is not there.
return;
}
// get the header
IndexHeader indexHeader = CarbonMetadataUtil
.getIndexHeader(localCardinality, thriftColumnSchemaList, model.getBucketId(),
model.getSchemaUpdatedTimeStamp());
indexHeader.setIs_sort(model.getSortScope() != null && model.getSortScope() != NO_SORT);
// get the block index info thrift
List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
String indexFileName;
if (enableDirectlyWriteDataToStorePath) {
String rawFileName = model.getCarbonDataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR
+ CarbonTablePath.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
"" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
indexFileName = FileFactory.getUpdatedFilePath(rawFileName);
} else {
// randomly choose a temp location for index file
String[] tempLocations = model.getStoreLocation();
String chosenTempLocation = tempLocations[new Random().nextInt(tempLocations.length)];
LOGGER.info("Randomly choose index file location: " + chosenTempLocation);
indexFileName = chosenTempLocation + File.separator + CarbonTablePath
.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
"" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
}
CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
// open file
writer.openThriftWriter(indexFileName);
// write the header first
writer.writeThrift(indexHeader);
// write the indexes
for (BlockIndex blockIndex : blockIndexThrift) {
writer.writeThrift(blockIndex);
}
writer.close();
if (!enableDirectlyWriteDataToStorePath) {
CarbonUtil
.copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
fileSizeInBytes, outputFilesInfoHolder);
FileFactory.deleteFile(indexFileName);
}
}
/**
* This method will close the executor service which is used for copying carbon
* data files to carbon store path
*
* @throws CarbonDataWriterException
*/
protected void closeExecutorService() throws CarbonDataWriterException {
CarbonDataWriterException exception = null;
try {
listener.finish();
listener = null;
} catch (IOException e) {
exception = new CarbonDataWriterException(e);
}
try {
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.HOURS);
for (int i = 0; i < executorServiceSubmitList.size(); i++) {
executorServiceSubmitList.get(i).get();
}
} catch (InterruptedException | ExecutionException e) {
if (null == exception) {
exception = new CarbonDataWriterException(e);
}
}
if (null != fallbackExecutorService) {
fallbackExecutorService.shutdownNow();
}
if (exception != null) {
throw exception;
}
}
/**
* This method will complete hdfs backend storage for this file.
* It may copy the carbon data file from local store location to carbon store location,
* it may also complete the remaining replications for the existing hdfs file.
*/
private final class CompleteHdfsBackendThread implements Callable<Void> {
/**
* complete path along with file name which needs to be copied to
* carbon store path
*/
private String fileName;
private CompleteHdfsBackendThread(String fileName) {
this.fileName = fileName;
}
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public Void call() throws Exception {
CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName, model.getCarbonDataDirectoryPath(),
fileSizeInBytes, outputFilesInfoHolder);
FileFactory.deleteFile(fileName);
return null;
}
}
}