blob: 2b74ee3a7c47dd297e2b68ea585429bb68de1389 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.readcommitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.hadoop.conf.Configuration;
/**
* This is a readCommittedScope for non transactional carbon table
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
public class LatestFilesReadCommittedScope implements ReadCommittedScope {
private static final long serialVersionUID = -839970494288861816L;
private String carbonFilePath;
private String segmentId;
private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
private LoadMetadataDetails[] loadMetadataDetails;
private transient Configuration configuration;
/**
* a new constructor of this class
*
* @param path carbon file path
* @param segmentId segment id
*/
public LatestFilesReadCommittedScope(String path, String segmentId, Configuration configuration)
throws IOException {
this.configuration = configuration;
Objects.requireNonNull(path);
this.carbonFilePath = path;
this.segmentId = segmentId;
takeCarbonIndexFileSnapShot();
}
/**
* a new constructor with path
*
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path, Configuration configuration)
throws IOException {
this(path, null, configuration);
}
/**
* a new constructor with carbon index files
*
* @param indexFiles carbon index files
*/
public LatestFilesReadCommittedScope(CarbonFile[] indexFiles, Configuration configuration) {
this.configuration = configuration;
takeCarbonIndexFileSnapShot(indexFiles);
}
private void prepareLoadMetadata() {
int loadCount = 0;
Map<String, List<String>> snapshotMap =
this.readCommittedIndexFileSnapShot.getSegmentIndexFileMap();
LoadMetadataDetails[] loadMetadataDetailsArray = new LoadMetadataDetails[snapshotMap.size()];
String segmentID;
for (Map.Entry<String, List<String>> entry : snapshotMap.entrySet()) {
segmentID = entry.getKey();
LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
long timeSet;
try {
timeSet = Long.parseLong(segmentID);
} catch (NumberFormatException nu) {
timeSet = 0;
}
loadMetadataDetails.setLoadEndTime(timeSet);
loadMetadataDetails.setLoadStartTime(timeSet);
loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
loadMetadataDetails.setLoadName(segmentID);
loadMetadataDetailsArray[loadCount++] = loadMetadataDetails;
}
this.loadMetadataDetails = loadMetadataDetailsArray;
}
@Override
public LoadMetadataDetails[] getSegmentList() throws IOException {
try {
if (loadMetadataDetails == null) {
takeCarbonIndexFileSnapShot();
}
return loadMetadataDetails;
} catch (IOException ex) {
throw new IOException("Problem encountered while reading the Table Status file.", ex);
}
}
@Override
public Map<String, String> getCommittedIndexFile(Segment segment) {
Map<String, String> indexFileStore = new HashMap<>();
Map<String, List<String>> snapShot = readCommittedIndexFileSnapShot.getSegmentIndexFileMap();
String segName;
if (segment.getSegmentNo() != null) {
segName = segment.getSegmentNo();
} else {
segName = segment.getSegmentFileName();
}
List<String> index = snapShot.get(segName);
if (null == index) {
index = new LinkedList<>();
}
for (String indexPath : index) {
if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
indexFileStore.put(indexPath, indexPath.substring(indexPath.lastIndexOf('/') + 1));
} else {
indexFileStore.put(indexPath, null);
}
}
return indexFileStore;
}
@Override
public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo) {
Map<String, SegmentRefreshInfo> snapShot =
readCommittedIndexFileSnapShot.getSegmentTimestampUpdaterMap();
String segName;
if (segment.getSegmentNo() != null) {
segName = segment.getSegmentNo();
} else {
segName = segment.getSegmentFileName();
}
SegmentRefreshInfo segmentRefreshInfo = snapShot.get(segName);
return segmentRefreshInfo;
}
private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
if (indexFilePath.contains("/Fact/Part0/Segment_")) {
// This is CarbonFile case where the Index files are present inside the Segment Folder
// So the Segment has to be extracted from the path not from the CarbonIndex file.
String segString = indexFilePath.substring(0, indexFilePath.lastIndexOf("/") + 1);
String segName =
segString.substring(segString.lastIndexOf("_") + 1, segString.lastIndexOf("/"));
return segName;
} else {
String fileName = carbonIndexFileName;
String segId = fileName.substring(fileName.lastIndexOf("-") + 1, fileName.lastIndexOf("."));
return segId;
}
}
@Override
public void takeCarbonIndexFileSnapShot() throws IOException {
// Read the current file Path get the list of indexes from the path.
CarbonFile file = FileFactory.getCarbonFile(carbonFilePath, configuration);
CarbonFile[] carbonIndexFiles = null;
if (file.isDirectory()) {
if (segmentId == null) {
List<CarbonFile> indexFiles = new ArrayList<>();
SegmentIndexFileStore.getCarbonIndexFilesRecursively(file, indexFiles);
carbonIndexFiles = indexFiles.toArray(new CarbonFile[0]);
} else {
String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId);
carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, configuration);
}
if (carbonIndexFiles.length == 0) {
throw new IOException(
"No Index files are present in the table location :" + carbonFilePath);
}
takeCarbonIndexFileSnapShot(carbonIndexFiles);
} else {
throw new IOException("Path is not pointing to directory");
}
}
private void takeCarbonIndexFileSnapShot(CarbonFile[] carbonIndexFiles) {
Map<String, List<String>> indexFileStore = new HashMap<>();
Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>();
for (int i = 0; i < carbonIndexFiles.length; i++) {
// TODO. Nested File Paths.
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)
|| carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
// Get Segment Name from the IndexFile.
String indexFilePath =
FileFactory.getUpdatedFilePath(carbonIndexFiles[i].getAbsolutePath());
String timestamp = getSegmentID(carbonIndexFiles[i].getName(), indexFilePath);
// TODO. During Partition table handling, place Segment File Name.
List<String> indexList;
SegmentRefreshInfo segmentRefreshInfo;
if (indexFileStore.get(timestamp) == null) {
indexList = new ArrayList<>(1);
segmentRefreshInfo =
new SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0, 0L);
segmentTimestampUpdaterMap.put(timestamp, segmentRefreshInfo);
} else {
// Entry is already present.
indexList = indexFileStore.get(timestamp);
segmentRefreshInfo = segmentTimestampUpdaterMap.get(timestamp);
}
indexList.add(indexFilePath);
if (segmentRefreshInfo.getSegmentUpdatedTimestamp() < carbonIndexFiles[i]
.getLastModifiedTime()) {
segmentRefreshInfo
.setSegmentUpdatedTimestamp(carbonIndexFiles[i].getLastModifiedTime());
}
indexFileStore.put(timestamp, indexList);
segmentRefreshInfo.setCountOfFileInSegment(indexList.size());
}
}
ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot =
new ReadCommittedIndexFileSnapShot(indexFileStore, segmentTimestampUpdaterMap);
this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot;
prepareLoadMetadata();
}
public Configuration getConfiguration() {
return configuration;
}
@Override
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
@Override
public String getFilePath() {
return carbonFilePath;
}
}