blob: 1a2ad0bd8df582076a5ed5b5ca6007fe3ad2a8ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.streaming.segment;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.comparator.Comparator;
import org.apache.carbondata.core.util.comparator.SerializableComparator;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletIndex;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.streaming.CarbonStreamRecordWriter;
import org.apache.carbondata.streaming.index.StreamFileIndex;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
/**
* streaming segment manager
*/
public class StreamSegment {
private static final Logger LOGGER =
LogServiceFactory.getLogService(StreamSegment.class.getName());
/**
* get stream segment or create new stream segment if not exists
*/
public static String open(CarbonTable table) throws IOException {
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(table.getAbsoluteTableIdentifier());
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info(
"Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for stream table get or create segment");
LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(
CarbonTablePath.getMetadataPath(table.getTablePath()));
LoadMetadataDetails streamSegment = null;
for (LoadMetadataDetails detail : details) {
if (FileFormat.ROW_V1.equals(detail.getFileFormat())) {
if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
streamSegment = detail;
break;
}
}
}
if (null == streamSegment) {
return createNewSegment(table, details);
} else {
return streamSegment.getLoadName();
}
} else {
LOGGER.error(
"Not able to acquire the lock for stream table get or create segment for table " + table
.getDatabaseName() + "." + table.getTableName());
throw new IOException("Failed to get stream segment");
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info("Table unlocked successfully after stream table get or create segment" + table
.getDatabaseName() + "." + table.getTableName());
} else {
LOGGER.error(
"Unable to unlock table lock for stream table" + table.getDatabaseName() + "." + table
.getTableName() + " during stream table get or create segment");
}
}
}
private static String createNewSegment(CarbonTable table, LoadMetadataDetails[] details)
throws IOException {
int segmentId = SegmentStatusManager.createNewSegmentId(details);
LoadMetadataDetails newDetail = new LoadMetadataDetails();
newDetail.setLoadName(String.valueOf(segmentId));
newDetail.setFileFormat(FileFormat.ROW_V1);
newDetail.setLoadStartTime(System.currentTimeMillis());
newDetail.setSegmentStatus(SegmentStatus.STREAMING);
LoadMetadataDetails[] newDetails = new LoadMetadataDetails[details.length + 1];
int i = 0;
for (; i < details.length; i++) {
newDetails[i] = details[i];
}
newDetails[i] = newDetail;
SegmentStatusManager
.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath()),
newDetails);
return newDetail.getLoadName();
}
/**
* marker old stream segment to finished status and create new stream segment
*/
public static String close(CarbonTable table, String segmentId)
throws IOException {
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(table.getAbsoluteTableIdentifier());
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info(
"Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for stream table finish segment");
LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(
CarbonTablePath.getMetadataPath(table.getTablePath()));
for (LoadMetadataDetails detail : details) {
if (segmentId.equals(detail.getLoadName())) {
detail.setLoadEndTime(System.currentTimeMillis());
detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
break;
}
}
return createNewSegment(table, details);
} else {
LOGGER.error(
"Not able to acquire the status update lock for streaming table " + table
.getDatabaseName() + "." + table.getTableName());
throw new IOException("Failed to get stream segment");
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status update" + table.getDatabaseName()
+ "." + table.getTableName());
} else {
LOGGER.error("Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
.getTableName() + " during table status update");
}
}
}
/**
* change the status of the segment from "streaming" to "streaming finish"
*/
public static void finishStreaming(CarbonTable carbonTable) throws IOException {
ICarbonLock statusLock = CarbonLockFactory.getCarbonLockObj(
carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
LockUsage.TABLE_STATUS_LOCK);
try {
if (statusLock.lockWithRetries()) {
LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
boolean updated = false;
for (LoadMetadataDetails detail : details) {
if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
detail.setLoadEndTime(System.currentTimeMillis());
detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
updated = true;
}
}
if (updated) {
SegmentStatusManager.writeLoadDetailsIntoFile(
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()),
details);
}
} else {
String msg = "Failed to acquire table status lock of " + carbonTable.getDatabaseName()
+ "." + carbonTable.getTableName();
LOGGER.error(msg);
throw new IOException(msg);
}
} finally {
if (statusLock.unlock()) {
LOGGER.info("Table unlocked successfully after table status update"
+ carbonTable.getDatabaseName() + "." + carbonTable.getTableName());
} else {
LOGGER.error("Unable to unlock Table lock for table " + carbonTable.getDatabaseName()
+ "." + carbonTable.getTableName() + " during table status update");
}
}
}
public static BlockletMinMaxIndex collectMinMaxIndex(SimpleStatsResult[] dimStats,
SimpleStatsResult[] mrsStats) {
BlockletMinMaxIndex minMaxIndex = new BlockletMinMaxIndex();
byte[][] maxIndexes = new byte[dimStats.length + mrsStats.length][];
for (int index = 0; index < dimStats.length; index++) {
maxIndexes[index] =
CarbonUtil.getValueAsBytes(dimStats[index].getDataType(), dimStats[index].getMax());
}
for (int index = 0; index < mrsStats.length; index++) {
maxIndexes[dimStats.length + index] =
CarbonUtil.getValueAsBytes(mrsStats[index].getDataType(), mrsStats[index].getMax());
}
minMaxIndex.setMaxValues(maxIndexes);
byte[][] minIndexes = new byte[maxIndexes.length][];
for (int index = 0; index < dimStats.length; index++) {
minIndexes[index] =
CarbonUtil.getValueAsBytes(dimStats[index].getDataType(), dimStats[index].getMin());
}
for (int index = 0; index < mrsStats.length; index++) {
minIndexes[dimStats.length + index] =
CarbonUtil.getValueAsBytes(mrsStats[index].getDataType(), mrsStats[index].getMin());
}
minMaxIndex.setMinValues(minIndexes);
// TODO: handle the min max writing for string type based on character limit for streaming
boolean[] isMinMaxSet = new boolean[dimStats.length + mrsStats.length];
Arrays.fill(isMinMaxSet, true);
minMaxIndex.setIsMinMaxSet(isMinMaxSet);
return minMaxIndex;
}
/**
* create a StreamBlockIndex from the SimpleStatsResult array
*/
private static StreamFileIndex createStreamBlockIndex(String fileName,
BlockletMinMaxIndex minMaxIndex, int blockletRowCount) {
return new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
}
/**
* invoke CarbonStreamOutputFormat to append batch data to existing carbondata file
*/
public static StreamFileIndex appendBatchData(CarbonIterator<Object[]> inputIterators,
TaskAttemptContext job, CarbonLoadModel carbonLoadModel) throws Exception {
CarbonStreamRecordWriter writer = null;
try {
writer = new CarbonStreamRecordWriter(job, carbonLoadModel);
// at the begin of each task, should recover file if necessary
// here can reuse some information of record writer
recoverFileIfRequired(
writer.getSegmentDir(),
writer.getFileName(),
CarbonTablePath.getCarbonStreamIndexFileName());
int blockletRowCount = 0;
while (inputIterators.hasNext()) {
writer.write(null, inputIterators.next());
blockletRowCount++;
}
inputIterators.close();
return createStreamBlockIndex(writer.getFileName(), writer.getBatchMinMaxIndex(),
blockletRowCount);
} catch (Throwable ex) {
if (writer != null) {
LOGGER.error("Failed to append batch data to stream segment: " +
writer.getSegmentDir(), ex);
writer.setHasException(true);
}
throw ex;
} finally {
if (writer != null) {
writer.close(job);
}
}
}
/**
* check the health of stream segment and try to recover segment from job fault
* this method will be invoked in following scenarios.
* 1. at the begin of the streaming (StreamSinkFactory.getStreamSegmentId)
* 2. after job failed (CarbonAppendableStreamSink.writeDataFileJob)
*/
public static void recoverSegmentIfRequired(String segmentDir) throws IOException {
if (FileFactory.isFileExist(segmentDir)) {
String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
String indexPath = segmentDir + File.separator + indexName;
CarbonFile index = FileFactory.getCarbonFile(indexPath);
CarbonFile[] files = listDataFiles(segmentDir);
// TODO better to check backup index at first
// index file exists
if (index.exists()) {
// data file exists
if (files.length > 0) {
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
try {
// map block index
indexReader.openThriftReader(indexPath);
Map<String, Long> tableSizeMap = new HashMap<>();
while (indexReader.hasNext()) {
BlockIndex blockIndex = indexReader.readBlockIndexInfo();
tableSizeMap.put(blockIndex.getFile_name(), blockIndex.getFile_size());
}
// recover each file
for (CarbonFile file : files) {
Long size = tableSizeMap.get(file.getName());
if (null == size || size == 0) {
file.delete();
} else if (size < file.getSize()) {
FileFactory.truncateFile(file.getCanonicalPath(), size);
}
}
} finally {
indexReader.closeThriftReader();
}
}
} else {
if (files.length > 0) {
for (CarbonFile file : files) {
file.delete();
}
}
}
}
}
/**
* check the health of stream data file and try to recover data file from task fault
* this method will be invoked in following scenarios.
* 1. at the begin of writing data file task
*/
public static void recoverFileIfRequired(
String segmentDir,
String fileName,
String indexName) throws IOException {
String filePath = segmentDir + File.separator + fileName;
CarbonFile file = FileFactory.getCarbonFile(filePath);
String indexPath = segmentDir + File.separator + indexName;
CarbonFile index = FileFactory.getCarbonFile(indexPath);
if (file.exists() && index.exists()) {
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
try {
indexReader.openThriftReader(indexPath);
while (indexReader.hasNext()) {
BlockIndex blockIndex = indexReader.readBlockIndexInfo();
if (blockIndex.getFile_name().equals(fileName)) {
if (blockIndex.getFile_size() == 0) {
file.delete();
} else if (blockIndex.getFile_size() < file.getSize()) {
FileFactory.truncateFile(filePath, blockIndex.getFile_size());
}
break;
}
}
} finally {
indexReader.closeThriftReader();
}
}
}
/**
* list all carbondata files of a segment
*/
public static CarbonFile[] listDataFiles(String segmentDir) {
CarbonFile carbonDir = FileFactory.getCarbonFile(segmentDir);
if (carbonDir.exists()) {
return carbonDir.listFiles(file -> CarbonTablePath.isCarbonDataFile(file.getName()));
} else {
return new CarbonFile[0];
}
}
/**
* read index file to list BlockIndex
*
* @param indexPath path of the index file
* @return the list of BlockIndex in the index file
* @throws IOException failed to read index file
*/
public static List<BlockIndex> readIndexFile(String indexPath)
throws IOException {
List<BlockIndex> blockIndexList = new ArrayList<>();
CarbonFile index = FileFactory.getCarbonFile(indexPath);
if (index.exists()) {
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
try {
indexReader.openThriftReader(indexPath);
while (indexReader.hasNext()) {
blockIndexList.add(indexReader.readBlockIndexInfo());
}
} finally {
indexReader.closeThriftReader();
}
}
return blockIndexList;
}
/**
* combine the index of new blocklet and the BlockletMinMaxIndex index of stream file
* 1. if file index is null, not require Min/Max index
* 2. if file index is not null,
* 2.1 if blocklet index is null, use the BlockletMinMaxIndex index of stream
* 2.2 if blocklet index is not null, combine these two index
*/
private static void mergeBatchMinMax(StreamFileIndex blockletIndex,
BlockletMinMaxIndex fileIndex, DataType[] msrDataTypes) throws IOException {
if (fileIndex == null) {
// backward compatibility
// it will not create a min/max index for the old stream file(without min/max index).
blockletIndex.setMinMaxIndex(null);
return;
}
BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
// if min/max of new blocklet is null, use min/max of old file
if (minMaxIndex == null) {
blockletIndex.setMinMaxIndex(fileIndex);
return;
}
SerializableComparator[] comparators = getSerializableComparators(msrDataTypes);
// min value
byte[][] minValues = minMaxIndex.getMinValues();
byte[][] mergedMinValues = fileIndex.getMinValues();
if (minValues == null || minValues.length == 0) {
// use file index
minMaxIndex.setMinValues(mergedMinValues);
} else if (mergedMinValues != null && mergedMinValues.length != 0) {
if (minValues.length != mergedMinValues.length) {
throw new IOException("the lengths of the min values should be same.");
}
mergeMinValues(msrDataTypes, comparators, minValues, mergedMinValues);
}
// max value
byte[][] maxValues = minMaxIndex.getMaxValues();
byte[][] mergedMaxValues = fileIndex.getMaxValues();
if (maxValues == null || maxValues.length == 0) {
minMaxIndex.setMaxValues(mergedMaxValues);
} else if (mergedMaxValues != null && mergedMaxValues.length != 0) {
if (maxValues.length != mergedMaxValues.length) {
throw new IOException("the lengths of the max values should be same.");
}
mergeMaxValues(msrDataTypes, comparators, maxValues, mergedMaxValues);
}
}
private static SerializableComparator[] getSerializableComparators(DataType[] msrDataTypes) {
SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
for (int index = 0; index < comparators.length; index++) {
comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
}
return comparators;
}
private static void mergeMaxValues(DataType[] msrDataTypes, SerializableComparator[] comparators,
byte[][] maxValues, byte[][] mergedMaxValues) {
int dimCount = maxValues.length - msrDataTypes.length;
for (int index = 0; index < maxValues.length; index++) {
if (index < dimCount) {
if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
< 0) {
maxValues[index] = mergedMaxValues[index];
}
} else {
Object object = DataTypeUtil
.getMeasureObjectFromDataType(maxValues[index], msrDataTypes[index - dimCount]);
Object mergedObject = DataTypeUtil
.getMeasureObjectFromDataType(mergedMaxValues[index], msrDataTypes[index - dimCount]);
if (comparators[index - dimCount].compare(object, mergedObject) < 0) {
maxValues[index] = mergedMaxValues[index];
}
}
}
}
private static void mergeMinValues(DataType[] msrDataTypes, SerializableComparator[] comparators,
byte[][] minValues, byte[][] mergedMinValues) {
int dimCount = minValues.length - msrDataTypes.length;
for (int index = 0; index < minValues.length; index++) {
if (index < dimCount) {
if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
> 0) {
minValues[index] = mergedMinValues[index];
}
} else {
Object object = DataTypeUtil
.getMeasureObjectFromDataType(minValues[index], msrDataTypes[index - dimCount]);
Object mergedObject = DataTypeUtil
.getMeasureObjectFromDataType(mergedMinValues[index], msrDataTypes[index - dimCount]);
if (comparators[index - dimCount].compare(object, mergedObject) > 0) {
minValues[index] = mergedMinValues[index];
}
}
}
}
/**
* merge blocklet min/max to generate batch min/max
*/
public static BlockletMinMaxIndex mergeBlockletMinMax(BlockletMinMaxIndex to,
BlockletMinMaxIndex from, DataType[] msrDataTypes) {
if (to == null) {
return from;
}
if (from == null) {
return to;
}
SerializableComparator[] comparators = getSerializableComparators(msrDataTypes);
// min value
byte[][] minValues = to.getMinValues();
byte[][] mergedMinValues = from.getMinValues();
mergeMinValues(msrDataTypes, comparators, minValues, mergedMinValues);
// max value
byte[][] maxValues = to.getMaxValues();
byte[][] mergedMaxValues = from.getMaxValues();
mergeMaxValues(msrDataTypes, comparators, maxValues, mergedMaxValues);
return to;
}
/**
* merge new blocklet index and old file index to create new file index
*/
private static void updateStreamFileIndex(Map<String, StreamFileIndex> indexMap,
String indexPath, DataType[] msrDataTypes
) throws IOException {
List<BlockIndex> blockIndexList = readIndexFile(indexPath);
for (BlockIndex blockIndex : blockIndexList) {
BlockletMinMaxIndex fileIndex = CarbonMetadataUtil
.convertExternalMinMaxIndex(blockIndex.getBlock_index().getMin_max_index());
StreamFileIndex blockletIndex = indexMap.get(blockIndex.getFile_name());
if (blockletIndex == null) {
// should index all stream file
indexMap.put(blockIndex.getFile_name(),
new StreamFileIndex(blockIndex.getFile_name(), fileIndex, blockIndex.getNum_rows()));
} else {
// merge minMaxIndex into StreamBlockIndex
blockletIndex.setRowCount(blockletIndex.getRowCount() + blockIndex.getNum_rows());
mergeBatchMinMax(blockletIndex, fileIndex, msrDataTypes);
}
}
}
/**
* update carbon index file after a stream batch.
*/
public static void updateIndexFile(String segmentDir,
StreamFileIndex[] blockIndexes, DataType[] msrDataTypes) throws IOException {
String filePath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
// update min/max index
Map<String, StreamFileIndex> indexMap = new HashMap<>();
for (StreamFileIndex fileIndex : blockIndexes) {
indexMap.put(fileIndex.getFileName(), fileIndex);
}
updateStreamFileIndex(indexMap, filePath, msrDataTypes);
String tempFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
try {
writer.openThriftWriter(tempFilePath);
CarbonFile[] files = listDataFiles(segmentDir);
BlockIndex blockIndex;
for (CarbonFile file : files) {
blockIndex = new BlockIndex();
blockIndex.setFile_name(file.getName());
blockIndex.setFile_size(file.getSize());
blockIndex.setOffset(-1);
// set min/max index
BlockletIndex blockletIndex = new BlockletIndex();
blockIndex.setBlock_index(blockletIndex);
StreamFileIndex streamFileIndex = indexMap.get(blockIndex.getFile_name());
if (streamFileIndex != null) {
blockletIndex.setMin_max_index(
CarbonMetadataUtil.convertMinMaxIndex(streamFileIndex.getMinMaxIndex()));
blockIndex.setNum_rows(streamFileIndex.getRowCount());
} else {
blockIndex.setNum_rows(-1);
}
// write block index
writer.writeThrift(blockIndex);
}
writer.close();
CarbonFile tempFile = FileFactory.getCarbonFile(tempFilePath);
if (!tempFile.renameForce(filePath)) {
throw new IOException(
"temporary file renaming failed, src=" + tempFilePath + ", dest=" + filePath);
}
} catch (IOException ex) {
try {
writer.close();
} catch (IOException t) {
LOGGER.error(t);
}
throw ex;
}
}
/**
* calculate the size of the segment by the accumulation of data sizes in index file
*/
public static long size(String segmentDir) throws IOException {
long size = 0;
if (FileFactory.isFileExist(segmentDir)) {
String indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
CarbonFile index = FileFactory.getCarbonFile(indexPath);
if (index.exists()) {
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
try {
indexReader.openThriftReader(indexPath);
while (indexReader.hasNext()) {
BlockIndex blockIndex = indexReader.readBlockIndexInfo();
size += blockIndex.getFile_size();
}
} finally {
indexReader.closeThriftReader();
}
}
}
return size;
}
}