blob: 40c346d74b4187544fef5f0a7b8f2bdee17ead93 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.writer;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.MergedBlockIndex;
import org.apache.carbondata.format.MergedBlockIndexHeader;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
public class CarbonIndexFileMergeWriter {
/**
* table handle
*/
private CarbonTable table;
/**
* thrift writer object
*/
private ThriftWriter thriftWriter;
private Logger LOGGER = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
public CarbonIndexFileMergeWriter(CarbonTable table) {
this.table = table;
}
/**
* Merge all the carbon index files of segment to a merged file
* @param tablePath
* @param indexFileNamesTobeAdded while merging, it considers only these files.
* If null, then consider all
* @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
* file. This will used in case of upgrade from version
* which do not store the blocklet info to current version
* @throws IOException
*/
private String mergeCarbonIndexFilesOfSegment(String segmentId,
String tablePath, List<String> indexFileNamesTobeAdded,
boolean readFileFooterFromCarbonDataFile, String uuid, String partitionPath) {
try {
Segment segment = Segment.getSegment(segmentId, tablePath);
String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
CarbonFile[] indexFiles;
SegmentFileStore sfs = null;
if (segment != null && segment.getSegmentFileName() != null) {
sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
if (table.isHivePartitionTable()) {
// in case of partition table, merge index files of a partition
List<CarbonFile> indexFilesInPartition = new ArrayList<>();
for (CarbonFile indexCarbonFile : indexCarbonFiles) {
if (FileFactory.getUpdatedFilePath(indexCarbonFile.getParentFile().getPath())
.equals(partitionPath)) {
indexFilesInPartition.add(indexCarbonFile);
}
}
indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
} else {
indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
}
} else {
indexFiles =
SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
}
if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
if (sfs == null) {
return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId);
} else {
return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, sfs,
indexFiles, uuid, partitionPath);
}
}
} catch (Exception e) {
LOGGER.error(
"Failed to merge index files in path: " + tablePath, e);
}
return null;
}
/**
* merge index files and return the index details
*/
public SegmentFileStore.FolderDetails mergeCarbonIndexFilesOfSegment(String segmentId,
String tablePath, String partitionPath, List<String> partitionInfo, String uuid,
String tempFolderPath, String currPartitionSpec) throws IOException {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
String partitionTempPath = "";
for (String partition : partitionInfo) {
if (partitionPath.equalsIgnoreCase(partition)) {
partitionTempPath = partition + "/" + tempFolderPath;
break;
}
}
if (null != partitionPath && !partitionTempPath.isEmpty()) {
fileStore.readAllIIndexOfSegment(partitionTempPath);
}
Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath();
Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
Path path = new Path(entry.getKey());
Map<String, byte[]> map = indexLocationMap.get(path.getParent().toString());
if (map == null) {
map = new HashMap<>();
indexLocationMap.put(path.getParent().toString(), map);
}
map.put(path.getName(), entry.getValue());
}
SegmentFileStore.FolderDetails folderDetails = null;
for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) {
String mergeIndexFile = writeMergeIndexFile(null, partitionPath, entry.getValue(), segmentId);
folderDetails = new SegmentFileStore.FolderDetails();
folderDetails.setMergeFileName(mergeIndexFile);
folderDetails.setStatus("Success");
List<String> partitions = new ArrayList<>();
if (partitionPath.startsWith(tablePath)) {
partitionPath = partitionPath.substring(tablePath.length() + 1, partitionPath.length());
partitions.addAll(Arrays.asList(partitionPath.split("/")));
folderDetails.setPartitions(partitions);
folderDetails.setRelative(true);
} else {
List<PartitionSpec> partitionSpecs;
if (currPartitionSpec != null) {
partitionSpecs = (ArrayList<PartitionSpec>) ObjectSerializationUtil
.convertStringToObject(currPartitionSpec);
PartitionSpec writeSpec = new PartitionSpec(null, partitionPath);
int index = partitionSpecs.indexOf(writeSpec);
if (index > -1) {
PartitionSpec spec = partitionSpecs.get(index);
folderDetails.setPartitions(spec.getPartitions());
folderDetails.setRelative(false);
} else {
throw new IOException("Unable to get PartitionSpec for: " + partitionPath);
}
} else {
throw new IOException("Unable to get PartitionSpec for: " + partitionPath);
}
}
}
return folderDetails;
}
private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles,
String segmentId) throws IOException {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
if (readFileFooterFromCarbonDataFile) {
// this case will be used in case of upgrade where old store will not have the blocklet
// info in the index file and therefore blocklet info need to be read from the file footer
// in the carbondata file
fileStore.readAllIndexAndFillBlockletInfo(segmentPath);
} else {
fileStore.readAllIIndexOfSegment(segmentPath);
}
Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap, segmentId);
for (CarbonFile indexFile : indexFiles) {
indexFile.delete();
}
return null;
}
public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
List<String> indexFileNamesTobeAdded, SegmentFileStore segmentFileStore,
CarbonFile[] indexFiles, String uuid, String partitionPath) throws IOException {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
// in case of partition table, merge index file to be created for each partition
if (null != partitionPath) {
for (CarbonFile indexFile : indexFiles) {
fileStore.readIndexFile(indexFile);
}
} else {
fileStore.readAllIIndexOfSegment(segmentFileStore.getSegmentFile(),
segmentFileStore.getTablePath(), SegmentStatus.SUCCESS, true);
}
Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath();
Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
for (Map.Entry<String, byte[]> entry: indexMap.entrySet()) {
Path path = new Path(entry.getKey());
Map<String, byte[]> map = indexLocationMap.get(path.getParent().toString());
if (map == null) {
map = new HashMap<>();
indexLocationMap.put(path.getParent().toString(), map);
}
map.put(path.getName(), entry.getValue());
}
List<PartitionSpec> partitionSpecs = SegmentFileStore
.getPartitionSpecs(segmentId, table.getTablePath(), SegmentStatusManager
.readLoadMetadata(CarbonTablePath.getMetadataPath(table.getTablePath())));
for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) {
String mergeIndexFile =
writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), entry.getValue(), segmentId);
for (Map.Entry<String, SegmentFileStore.FolderDetails> segment : segmentFileStore
.getLocationMap().entrySet()) {
String location = segment.getKey();
if (segment.getValue().isRelative()) {
location =
segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location;
}
if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
segment.getValue().setMergeFileName(mergeIndexFile);
segment.getValue().setFiles(new HashSet<String>());
break;
}
}
if (table.isHivePartitionTable()) {
for (PartitionSpec partitionSpec : partitionSpecs) {
if (partitionSpec.getLocation().toString().equals(partitionPath)) {
SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
}
}
}
}
String newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
+ CarbonTablePath.SEGMENT_EXT;
String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
+ CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
if (!table.isHivePartitionTable()) {
SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName,
table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
}
for (CarbonFile file : indexFiles) {
file.delete();
}
return uuid;
}
private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, String segmentPath,
Map<String, byte[]> indexMap, String segment_id) throws IOException {
MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
List<String> fileNames = new ArrayList<>(indexMap.size());
List<ByteBuffer> data = new ArrayList<>(indexMap.size());
for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
if (indexFileNamesTobeAdded == null ||
indexFileNamesTobeAdded.contains(entry.getKey())) {
fileNames.add(entry.getKey());
data.add(ByteBuffer.wrap(entry.getValue()));
}
}
if (fileNames.size() > 0) {
String mergeIndexName =
segment_id + '_' + System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT;
openThriftWriter(segmentPath + "/" + mergeIndexName);
indexHeader.setFile_names(fileNames);
mergedBlockIndex.setFileData(data);
writeMergedBlockIndexHeader(indexHeader);
writeMergedBlockIndex(mergedBlockIndex);
close();
return mergeIndexName;
}
return null;
}
/**
* Merge all the carbon index files of segment to a merged file
*
* @param segmentId
*/
public String mergeCarbonIndexFilesOfSegment(String segmentId, String uuid, String tablePath,
String partitionPath) {
return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false, uuid, partitionPath);
}
/**
* Merge all the carbon index files of segment to a merged file
*
* @param segmentId
* @param readFileFooterFromCarbonDataFile
*/
public String mergeCarbonIndexFilesOfSegment(String segmentId, String tablePath,
boolean readFileFooterFromCarbonDataFile, String uuid) {
return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null,
readFileFooterFromCarbonDataFile, uuid, null);
}
private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
for (CarbonFile file : indexFiles) {
if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
return true;
}
}
return false;
}
/**
* It writes thrift object to file
*
* @throws IOException
*/
private void writeMergedBlockIndexHeader(MergedBlockIndexHeader indexObject) throws IOException {
thriftWriter.write(indexObject);
}
/**
* It writes thrift object to file
*
* @throws IOException
*/
private void writeMergedBlockIndex(MergedBlockIndex indexObject) throws IOException {
thriftWriter.write(indexObject);
}
/**
* Below method will be used to open the thrift writer
*
* @param filePath file path where data need to be written
* @throws IOException throws io exception in case of any failure
*/
private void openThriftWriter(String filePath) throws IOException {
// create thrift writer instance
thriftWriter = new ThriftWriter(filePath, false);
// open the file stream
thriftWriter.open(FileWriteOperation.OVERWRITE);
}
/**
* Below method will be used to close the thrift object
*/
private void close() throws IOException {
thriftWriter.close();
}
public static class SegmentIndexFIleMergeStatus implements Serializable {
private SegmentFileStore.SegmentFile segmentFile;
private List<String> filesTobeDeleted;
public SegmentIndexFIleMergeStatus(SegmentFileStore.SegmentFile segmentFile,
List<String> filesTobeDeleted) {
this.segmentFile = segmentFile;
this.filesTobeDeleted = filesTobeDeleted;
}
public SegmentFileStore.SegmentFile getSegmentFile() {
return segmentFile;
}
public List<String> getFilesTobeDeleted() {
return filesTobeDeleted;
}
}
}