| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.core.statusmanager; |
| |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.Closeable; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.OutputStreamWriter; |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.datastore.filesystem.CarbonFile; |
| import org.apache.carbondata.core.datastore.impl.FileFactory; |
| import org.apache.carbondata.core.exception.ConcurrentOperationException; |
| import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory; |
| import org.apache.carbondata.core.fileoperations.AtomicFileOperations; |
| import org.apache.carbondata.core.fileoperations.FileWriteOperation; |
| import org.apache.carbondata.core.index.Segment; |
| import org.apache.carbondata.core.indexstore.PartitionSpec; |
| import org.apache.carbondata.core.locks.CarbonLockFactory; |
| import org.apache.carbondata.core.locks.CarbonLockUtil; |
| import org.apache.carbondata.core.locks.ICarbonLock; |
| import org.apache.carbondata.core.locks.LockUsage; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.CarbonTableIdentifier; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; |
| import org.apache.carbondata.core.mutate.CarbonUpdateUtil; |
| import org.apache.carbondata.core.readcommitter.ReadCommittedScope; |
| import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.core.util.DeleteLoadFolders; |
| import org.apache.carbondata.core.util.path.CarbonTablePath; |
| |
| import static org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET; |
| |
| import com.google.gson.Gson; |
| import com.google.gson.JsonSyntaxException; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * Manages Load/Segment status |
| */ |
| public class SegmentStatusManager { |
| |
| private static final Logger LOG = |
| LogServiceFactory.getLogService(SegmentStatusManager.class.getName()); |
| |
| private AbsoluteTableIdentifier identifier; |
| |
| private Configuration configuration; |
| |
| private static final int READ_TABLE_STATUS_RETRY_COUNT = CarbonLockUtil |
| .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK, |
| CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT); |
| |
| private static final int READ_TABLE_STATUS_RETRY_TIMEOUT = CarbonLockUtil |
| .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK, |
| CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT); |
| |
| public SegmentStatusManager(AbsoluteTableIdentifier identifier) { |
| this.identifier = identifier; |
| configuration = FileFactory.getConfiguration(); |
| } |
| |
| public SegmentStatusManager(AbsoluteTableIdentifier identifier, Configuration configuration) { |
| this.identifier = identifier; |
| this.configuration = configuration; |
| } |
| |
| /** |
| * This will return the lock object used to lock the table status file before updation. |
| * |
| * @return |
| */ |
| public ICarbonLock getTableStatusLock() { |
| return CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK); |
| } |
| |
| /** |
| * This method will return last modified time of tablestatus file |
| */ |
| public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identifier) |
| throws IOException { |
| String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); |
| if (!FileFactory.isFileExist(tableStatusPath)) { |
| return 0L; |
| } else { |
| return FileFactory.getCarbonFile(tableStatusPath) |
| .getLastModifiedTime(); |
| } |
| } |
| |
| public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException { |
| return getValidAndInvalidSegments(false, null, null); |
| } |
| |
| public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable) |
| throws IOException { |
| return getValidAndInvalidSegments(isChildTable, null, null); |
| } |
| |
| /** |
| * get valid segment for given load status details. |
| */ |
| public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable, |
| LoadMetadataDetails[] loadMetadataDetails, ReadCommittedScope readCommittedScope) |
| throws IOException { |
| |
| // @TODO: move reading LoadStatus file to separate class |
| List<Segment> listOfValidSegments = new ArrayList<>(10); |
| List<Segment> listOfValidUpdatedSegments = new ArrayList<>(10); |
| List<Segment> listOfInvalidSegments = new ArrayList<>(10); |
| List<Segment> listOfStreamSegments = new ArrayList<>(10); |
| List<Segment> listOfInProgressSegments = new ArrayList<>(10); |
| |
| try { |
| if (loadMetadataDetails == null) { |
| loadMetadataDetails = readTableStatusFile( |
| CarbonTablePath.getTableStatusFilePath(identifier.getTablePath())); |
| } |
| |
| if (readCommittedScope == null) { |
| readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails, |
| configuration); |
| } |
| //just directly iterate Array |
| for (LoadMetadataDetails segment : loadMetadataDetails) { |
| if (SegmentStatus.SUCCESS == segment.getSegmentStatus() |
| || SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus() |
| || SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus() |
| || SegmentStatus.STREAMING == segment.getSegmentStatus() |
| || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) { |
| // check for merged loads. |
| if (null != segment.getMergedLoadName()) { |
| Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile(), |
| readCommittedScope, segment); |
| if (!listOfValidSegments.contains(seg)) { |
| listOfValidSegments.add(seg); |
| } |
| // if merged load is updated then put it in updated list |
| if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) { |
| listOfValidUpdatedSegments.add(seg); |
| } |
| continue; |
| } |
| |
| if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) { |
| |
| listOfValidUpdatedSegments.add( |
| new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope)); |
| } |
| if (SegmentStatus.STREAMING == segment.getSegmentStatus() |
| || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) { |
| listOfStreamSegments.add( |
| new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope)); |
| continue; |
| } |
| // In case of child table, during loading, if no record is loaded to the segment, then |
| // segmentStatus will be marked as 'Success'. During query, don't need to add that segment |
| // to validSegment list, as segment does not exists |
| if (isChildTable) { |
| if (!segment.getDataSize().equalsIgnoreCase("0") && !segment.getIndexSize() |
| .equalsIgnoreCase("0")) { |
| listOfValidSegments.add( |
| new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope, |
| segment)); |
| } |
| } else { |
| listOfValidSegments.add( |
| new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope, |
| segment)); |
| } |
| } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() |
| || SegmentStatus.COMPACTED == segment.getSegmentStatus() |
| || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) { |
| listOfInvalidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile())); |
| } else if (SegmentStatus.INSERT_IN_PROGRESS == segment.getSegmentStatus() || |
| SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segment.getSegmentStatus()) { |
| listOfInProgressSegments.add( |
| new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope)); |
| } |
| } |
| } catch (IOException e) { |
| LOG.error(e.getMessage(), e); |
| throw e; |
| } |
| return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments, |
| listOfInvalidSegments, listOfStreamSegments, listOfInProgressSegments); |
| } |
| |
| /** |
| * This method reads the load metadata file |
| * |
| * @param metadataFolderPath |
| * @return |
| */ |
| public static LoadMetadataDetails[] readLoadMetadata(String metadataFolderPath) { |
| String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR |
| + CarbonTablePath.TABLE_STATUS_FILE; |
| try { |
| return readTableStatusFile(metadataFileName); |
| } catch (IOException e) { |
| return new LoadMetadataDetails[0]; |
| } |
| } |
| |
| /** |
| * Returns valid segment list for a given RelationIdentifier |
| * |
| * @param relationIdentifier get list of segments for relation identifier |
| * @return list of valid segment id's |
| */ |
| public static List<String> getValidSegmentList(RelationIdentifier relationIdentifier) |
| throws IOException { |
| List<String> segmentList = new ArrayList<>(); |
| List<Segment> validSegments = new SegmentStatusManager(AbsoluteTableIdentifier |
| .from(relationIdentifier.getTablePath(), relationIdentifier.getDatabaseName(), |
| relationIdentifier.getTableName())).getValidAndInvalidSegments().getValidSegments(); |
| for (Segment segment : validSegments) { |
| segmentList.add(segment.getSegmentNo()); |
| } |
| return segmentList; |
| } |
| |
| /** |
| * Reads the table status file with the specified UUID if non empty. |
| */ |
| public static LoadMetadataDetails[] readLoadMetadata(String metaDataFolderPath, String uuid) |
| throws IOException { |
| String tableStatusFileName; |
| if (uuid.isEmpty()) { |
| tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR |
| + CarbonTablePath.TABLE_STATUS_FILE; |
| } else { |
| tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR |
| + CarbonTablePath.TABLE_STATUS_FILE + CarbonCommonConstants.UNDERSCORE + uuid; |
| } |
| return readTableStatusFile(tableStatusFileName); |
| } |
| |
| /** |
| * This method reads the load history metadata file |
| * |
| * @param metadataFolderPath |
| * @return |
| */ |
| public static LoadMetadataDetails[] readLoadHistoryMetadata(String metadataFolderPath) { |
| String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR |
| + CarbonTablePath.TABLE_STATUS_HISTORY_FILE; |
| try { |
| return readTableStatusFile(metadataFileName); |
| } catch (IOException e) { |
| return new LoadMetadataDetails[0]; |
| } |
| } |
| |
| /** |
| * Read file and return its content as string |
| * |
| * @param tableStatusPath path of the table status to read |
| * @return file content, null is file does not exist |
| * @throws IOException if IO errors |
| */ |
| private static String readFileAsString(String tableStatusPath) throws IOException { |
| DataInputStream dataInputStream = null; |
| BufferedReader buffReader = null; |
| InputStreamReader inStream = null; |
| |
| AtomicFileOperations fileOperation = |
| AtomicFileOperationFactory.getAtomicFileOperations(tableStatusPath); |
| |
| if (!FileFactory.isFileExist(tableStatusPath)) { |
| return null; |
| } |
| |
| try { |
| dataInputStream = fileOperation.openForRead(); |
| inStream = new InputStreamReader(dataInputStream, Charset.forName(DEFAULT_CHARSET)); |
| buffReader = new BufferedReader(inStream); |
| return buffReader.readLine(); |
| } catch (EOFException ex) { |
| throw ex; |
| } catch (IOException e) { |
| LOG.error("Failed to read table status file", e); |
| throw e; |
| } finally { |
| closeStreams(buffReader, inStream, dataInputStream); |
| } |
| } |
| |
| /** |
| * Read table status file and decoded to segment meta arrays |
| * |
| * @param tableStatusPath table status file path |
| * @return segment metadata |
| * @throws IOException if IO errors |
| */ |
| public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath) |
| throws IOException { |
| int retry = READ_TABLE_STATUS_RETRY_COUNT; |
| |
| // When storing table status file in object store, reading of table status file may |
| // fail (receive IOException or JsonSyntaxException) |
| // when table status file is being modifying |
| // so here we retry multiple times before |
| // throwing IOException or JsonSyntaxException |
| while (retry > 0) { |
| try { |
| String content = readFileAsString(tableStatusPath); |
| if (content == null) { |
| return new LoadMetadataDetails[0]; |
| } |
| return new Gson().fromJson(content, LoadMetadataDetails[].class); |
| } catch (JsonSyntaxException | IOException ex) { |
| retry--; |
| if (retry == 0) { |
| // we have retried several times, throw this exception to make the execution failed |
| LOG.error("Failed to read table status file:" + tableStatusPath); |
| throw ex; |
| } |
| try { |
| LOG.warn("Failed to read table status file, retrycount:" + retry); |
| // sleep for some time before retry |
| TimeUnit.SECONDS.sleep(READ_TABLE_STATUS_RETRY_TIMEOUT); |
| } catch (InterruptedException e) { |
| // ignored |
| } |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * This method will get the max segment id |
| * |
| * @param loadMetadataDetails |
| * @return |
| */ |
| private static int getMaxSegmentId(LoadMetadataDetails[] loadMetadataDetails) { |
| int newSegmentId = -1; |
| for (int i = 0; i < loadMetadataDetails.length; i++) { |
| try { |
| int loadCount = Integer.parseInt(loadMetadataDetails[i].getLoadName()); |
| if (newSegmentId < loadCount) { |
| newSegmentId = loadCount; |
| } |
| } catch (NumberFormatException ne) { |
| // this case is for compacted folders. For compacted folders Id will be like 0.1, 2.1 |
| // consider a case when 12 loads are completed and after that major compaction is triggered. |
| // In this case new compacted folder will be created with name 12.1 and after query time |
| // out all the compacted folders will be deleted and entry will also be removed from the |
| // table status file. In that case also if a new load comes the new segment Id assigned |
| // should be 13 and not 0 |
| String loadName = loadMetadataDetails[i].getLoadName(); |
| if (loadName.contains(".")) { |
| int loadCount = Integer.parseInt(loadName.split("\\.")[0]); |
| if (newSegmentId < loadCount) { |
| newSegmentId = loadCount; |
| } |
| } |
| } |
| } |
| return newSegmentId; |
| } |
| |
| /** |
| * This method will create new segment id |
| * |
| * @param loadMetadataDetails |
| * @return |
| */ |
| public static int createNewSegmentId(LoadMetadataDetails[] loadMetadataDetails) { |
| int newSegmentId = getMaxSegmentId(loadMetadataDetails); |
| newSegmentId++; |
| return newSegmentId; |
| } |
| |
| /** |
| * compares two given date strings |
| * |
| * @param loadValue |
| * @param userValue |
| * @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg, |
| * 0 otherwise |
| */ |
| private static Integer compareDateValues(Long loadValue, Long userValue) { |
| return loadValue.compareTo(userValue); |
| } |
| |
| /** |
| * updates deletion status |
| * |
| * @param loadIds |
| * @param tableFolderPath |
| * @return |
| */ |
| public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier, |
| List<String> loadIds, String tableFolderPath) throws Exception { |
| CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier(); |
| ICarbonLock carbonDeleteSegmentLock = |
| CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.DELETE_SEGMENT_LOCK); |
| ICarbonLock carbonTableStatusLock = |
| CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK); |
| String tableDetails = |
| carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName(); |
| List<String> invalidLoadIds = new ArrayList<String>(0); |
| try { |
| if (carbonDeleteSegmentLock.lockWithRetries()) { |
| LOG.info("Delete segment lock has been successfully acquired"); |
| |
| String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); |
| LoadMetadataDetails[] listOfLoadFolderDetailsArray = null; |
| if (!FileFactory.isFileExist(dataLoadLocation)) { |
| // log error. |
| LOG.error("Load metadata file is not present."); |
| return loadIds; |
| } |
| // read existing metadata details in load metadata. |
| listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath); |
| if (listOfLoadFolderDetailsArray.length != 0) { |
| updateDeletionStatus(identifier, loadIds, listOfLoadFolderDetailsArray, invalidLoadIds); |
| if (invalidLoadIds.isEmpty()) { |
| // All or None , if anything fails then don't write |
| if (carbonTableStatusLock.lockWithRetries()) { |
| LOG.info("Table status lock has been successfully acquired"); |
| // To handle concurrency scenarios, always take latest metadata before writing |
| // into status file. |
| LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath); |
| updateLatestTableStatusDetails(listOfLoadFolderDetailsArray, |
| latestLoadMetadataDetails); |
| writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); |
| } |
| else { |
| String errorMsg = "Delete segment by id is failed for " + tableDetails |
| + ". Not able to acquire the table status lock due to other operation running " |
| + "in the background."; |
| LOG.error(errorMsg); |
| throw new Exception(errorMsg + " Please try after some time."); |
| } |
| |
| } else { |
| return invalidLoadIds; |
| } |
| |
| } else { |
| LOG.error("Delete segment by Id is failed. No matching segment id found."); |
| return loadIds; |
| } |
| |
| } else { |
| String errorMsg = "Delete segment by id is failed for " + tableDetails |
| + ". Not able to acquire the delete segment lock due to another delete " |
| + "operation is running in the background."; |
| LOG.error(errorMsg); |
| throw new Exception(errorMsg + " Please try after some time."); |
| } |
| } catch (IOException e) { |
| LOG.error("IOException" + e.getMessage(), e); |
| throw e; |
| } finally { |
| CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); |
| CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK); |
| } |
| |
| return invalidLoadIds; |
| } |
| |
| /** |
| * updates deletion status |
| * |
| * @param loadDate |
| * @param tableFolderPath |
| * @return |
| */ |
| public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier, |
| String loadDate, String tableFolderPath, Long loadStartTime) throws Exception { |
| CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier(); |
| ICarbonLock carbonDeleteSegmentLock = |
| CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.DELETE_SEGMENT_LOCK); |
| ICarbonLock carbonTableStatusLock = |
| CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK); |
| String tableDetails = |
| carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName(); |
| List<String> invalidLoadTimestamps = new ArrayList<String>(0); |
| try { |
| if (carbonDeleteSegmentLock.lockWithRetries()) { |
| LOG.info("Delete segment lock has been successfully acquired"); |
| |
| String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); |
| LoadMetadataDetails[] listOfLoadFolderDetailsArray = null; |
| |
| if (!FileFactory.isFileExist(dataLoadLocation)) { |
| // Table status file is not present, maybe table is empty, ignore this operation |
| LOG.warn("Trying to update table metadata file which is not present."); |
| return invalidLoadTimestamps; |
| } |
| // read existing metadata details in load metadata. |
| listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath); |
| if (listOfLoadFolderDetailsArray.length != 0) { |
| updateDeletionStatus(identifier, loadDate, listOfLoadFolderDetailsArray, |
| invalidLoadTimestamps, loadStartTime); |
| if (invalidLoadTimestamps.isEmpty()) { |
| if (carbonTableStatusLock.lockWithRetries()) { |
| LOG.info("Table status lock has been successfully acquired."); |
| // To handle concurrency scenarios, always take latest metadata before writing |
| // into status file. |
| LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath); |
| updateLatestTableStatusDetails(listOfLoadFolderDetailsArray, |
| latestLoadMetadataDetails); |
| writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); |
| } |
| else { |
| |
| String errorMsg = "Delete segment by date is failed for " + tableDetails |
| + ". Not able to acquire the table status lock due to other operation running " |
| + "in the background."; |
| LOG.error(errorMsg); |
| throw new Exception(errorMsg + " Please try after some time."); |
| |
| } |
| } else { |
| return invalidLoadTimestamps; |
| } |
| |
| } else { |
| LOG.error("Delete segment by date is failed. No matching segment found."); |
| invalidLoadTimestamps.add(loadDate); |
| return invalidLoadTimestamps; |
| } |
| |
| } else { |
| String errorMsg = "Delete segment by date is failed for " + tableDetails |
| + ". Not able to acquire the delete segment lock due to another delete " |
| + "operation is running in the background."; |
| LOG.error(errorMsg); |
| throw new Exception(errorMsg + " Please try after some time."); |
| } |
| } catch (IOException e) { |
| LOG.error("Error message: " + "IOException" + e.getMessage(), e); |
| throw e; |
| } finally { |
| CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); |
| CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK); |
| } |
| |
| return invalidLoadTimestamps; |
| } |
| |
| /** |
| * Backup the table status file as 'tablestatus.backup' in the same path |
| * |
| * @param tableStatusPath table status file path |
| */ |
| private static void backupTableStatus(String tableStatusPath) throws IOException { |
| CarbonFile file = FileFactory.getCarbonFile(tableStatusPath); |
| if (file.exists()) { |
| String backupPath = tableStatusPath + ".backup"; |
| String currentContent = readFileAsString(tableStatusPath); |
| if (currentContent != null) { |
| writeStringIntoFile(backupPath, currentContent); |
| } |
| } |
| } |
| |
| /** |
| * writes load details to specified path |
| * |
| * @param tableStatusPath path of the table status file |
| * @param listOfLoadFolderDetailsArray segment metadata |
| * @throws IOException if IO errors |
| */ |
| public static void writeLoadDetailsIntoFile( |
| String tableStatusPath, |
| LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException { |
| // When overwriting table status file, if process crashed, table status file |
| // will be in corrupted state. This can happen in an unstable environment, |
| // like in the cloud. To prevent the table corruption, user can enable following |
| // property to enable backup of the table status before overwriting it. |
| if (tableStatusPath.endsWith(CarbonTablePath.TABLE_STATUS_FILE) && |
| CarbonProperties.isEnableTableStatusBackup()) { |
| backupTableStatus(tableStatusPath); |
| } |
| String content = new Gson().toJson(listOfLoadFolderDetailsArray); |
| mockForTest(); |
| // make the table status file smaller by removing fields that are default value |
| for (LoadMetadataDetails loadMetadataDetails : listOfLoadFolderDetailsArray) { |
| loadMetadataDetails.removeUnnecessaryField(); |
| } |
| // If process crashed during following write, table status file need to be |
| // manually recovered. |
| writeStringIntoFile(FileFactory.getUpdatedFilePath(tableStatusPath), content); |
| } |
| |
| // a dummy func for mocking in testcase, which simulates IOException |
| private static void mockForTest() { |
| } |
| |
| /** |
| * writes string content to specified path |
| * |
| * @param filePath path of the file to write |
| * @param content content to write |
| * @throws IOException if IO errors |
| */ |
| private static void writeStringIntoFile(String filePath, String content) throws IOException { |
| AtomicFileOperations fileWrite = AtomicFileOperationFactory.getAtomicFileOperations(filePath); |
| BufferedWriter brWriter = null; |
| DataOutputStream dataOutputStream = null; |
| try { |
| dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); |
| brWriter = new BufferedWriter(new OutputStreamWriter( |
| dataOutputStream, Charset.forName(DEFAULT_CHARSET))); |
| brWriter.write(content); |
| } catch (IOException ioe) { |
| LOG.error("Write file failed: " + ioe.getLocalizedMessage()); |
| fileWrite.setFailed(); |
| throw ioe; |
| } finally { |
| CarbonUtil.closeStreams(brWriter); |
| fileWrite.close(); |
| } |
| } |
| |
| /** |
| * updates deletion status details for each load and returns invalidLoadIds |
| * |
| * @param loadIds |
| * @param listOfLoadFolderDetailsArray |
| * @param invalidLoadIds |
| * @return invalidLoadIds |
| */ |
| private static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier, |
| List<String> loadIds, LoadMetadataDetails[] listOfLoadFolderDetailsArray, |
| List<String> invalidLoadIds) { |
| SegmentStatus segmentStatus = null; |
| for (String loadId : loadIds) { |
| boolean loadFound = false; |
| // For each load id loop through data and if the |
| // load id is found then mark |
| // the metadata as deleted. |
| for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { |
| |
| if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) { |
| segmentStatus = loadMetadata.getSegmentStatus(); |
| if (SegmentStatus.COMPACTED == segmentStatus) { |
| // if the segment is compacted then no need to delete that. |
| LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId); |
| invalidLoadIds.add(loadId); |
| return invalidLoadIds; |
| } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus |
| && isLoadInProgress(absoluteTableIdentifier, loadId)) { |
| // if the segment status is in progress then no need to delete that. |
| LOG.error("Cannot delete the segment " + loadId + " which is load in progress"); |
| invalidLoadIds.add(loadId); |
| return invalidLoadIds; |
| } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus |
| && isLoadInProgress(absoluteTableIdentifier, loadId)) { |
| // if the segment status is overwrite in progress, then no need to delete that. |
| LOG.error("Cannot delete the segment " + loadId + " which is load overwrite " + |
| "in progress"); |
| invalidLoadIds.add(loadId); |
| return invalidLoadIds; |
| } else if (SegmentStatus.STREAMING == segmentStatus) { |
| // if the segment status is streaming, the segment can't be deleted directly. |
| LOG.error("Cannot delete the segment " + loadId + " which is streaming in progress"); |
| invalidLoadIds.add(loadId); |
| return invalidLoadIds; |
| } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) { |
| loadFound = true; |
| loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); |
| loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime()); |
| LOG.info("Segment ID " + loadId + " Marked for Delete"); |
| } |
| break; |
| } |
| } |
| |
| if (!loadFound) { |
| LOG.error("Delete segment by ID is failed. No matching segment id found :" + loadId); |
| invalidLoadIds.add(loadId); |
| return invalidLoadIds; |
| } |
| } |
| return invalidLoadIds; |
| } |
| |
| /** |
| * updates deletion status details for load and returns invalidLoadTimestamps |
| * |
| * @param loadDate |
| * @param listOfLoadFolderDetailsArray |
| * @param invalidLoadTimestamps |
| * @return invalidLoadTimestamps |
| */ |
| private static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier, |
| String loadDate, LoadMetadataDetails[] listOfLoadFolderDetailsArray, |
| List<String> invalidLoadTimestamps, Long loadStartTime) { |
| // For each load timestamp loop through data and if the |
| // required load timestamp is found then mark |
| // the metadata as deleted. |
| boolean loadFound = false; |
| String loadStartTimeString = "Load Start Time: "; |
| SegmentStatus segmentStatus = null; |
| for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { |
| Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime); |
| if (result < 0) { |
| segmentStatus = loadMetadata.getSegmentStatus(); |
| if (SegmentStatus.COMPACTED == segmentStatus) { |
| LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() |
| + "as the segment has been compacted."); |
| } else if (SegmentStatus.STREAMING == segmentStatus) { |
| LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() |
| + "as the segment is streaming in progress."); |
| } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus && isLoadInProgress( |
| absoluteTableIdentifier, loadMetadata.getLoadName())) { |
| LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() |
| + "as the segment is insert in progress."); |
| } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus |
| && isLoadInProgress(absoluteTableIdentifier, loadMetadata.getLoadName())) { |
| LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() |
| + "as the segment is insert overwrite in progress."); |
| } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) { |
| loadFound = true; |
| updateSegmentMetadataDetails(loadMetadata); |
| LOG.info("Info: " + loadStartTimeString + loadMetadata.getLoadStartTime() |
| + " Marked for Delete"); |
| } |
| } |
| } |
| |
| if (!loadFound) { |
| invalidLoadTimestamps.add(loadDate); |
| LOG.error("Delete segment by date is failed. No matching segment found."); |
| return invalidLoadTimestamps; |
| } |
| return invalidLoadTimestamps; |
| } |
| |
| /** |
| * This method closes the streams |
| * |
| * @param streams - streams to close. |
| */ |
| private static void closeStreams(Closeable... streams) { |
| // Added if to avoid NullPointerException in case one stream is being passed as null |
| if (null != streams) { |
| for (Closeable stream : streams) { |
| if (null != stream) { |
| try { |
| stream.close(); |
| } catch (IOException e) { |
| LOG.error("Error while closing stream" + stream); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * updates table status details using latest metadata |
| * |
| * @param oldMetadata |
| * @param newMetadata |
| * @return |
| */ |
| private static List<LoadMetadataDetails> updateLatestTableStatusDetails( |
| LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) { |
| |
| List<LoadMetadataDetails> newListMetadata = |
| new ArrayList<LoadMetadataDetails>(Arrays.asList(newMetadata)); |
| for (LoadMetadataDetails oldSegment : oldMetadata) { |
| if (SegmentStatus.MARKED_FOR_DELETE == oldSegment.getSegmentStatus()) { |
| updateSegmentMetadataDetails(newListMetadata.get(newListMetadata.indexOf(oldSegment))); |
| } |
| } |
| return newListMetadata; |
| } |
| |
| /** |
| * updates segment status and modificaton time details |
| * |
| * @param loadMetadata |
| */ |
| private static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) { |
| // update status only if the segment is not marked for delete |
| if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) { |
| loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); |
| loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime()); |
| } |
| } |
| |
| /** |
| * This API will return the update status file name. |
| * @param segmentList |
| * @return |
| */ |
| public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) { |
| if (segmentList.length == 0) { |
| return ""; |
| } |
| else { |
| for (LoadMetadataDetails eachSeg : segmentList) { |
| // file name stored in 0th segment. |
| if (eachSeg.getLoadName().equalsIgnoreCase("0")) { |
| return eachSeg.getUpdateStatusFileName(); |
| } |
| } |
| } |
| return ""; |
| } |
| |
| public static class ValidAndInvalidSegmentsInfo { |
| private final List<Segment> listOfValidSegments; |
| private final List<Segment> listOfValidUpdatedSegments; |
| private final List<Segment> listOfInvalidSegments; |
| private final List<Segment> listOfStreamSegments; |
| private final List<Segment> listOfInProgressSegments; |
| |
| private ValidAndInvalidSegmentsInfo(List<Segment> listOfValidSegments, |
| List<Segment> listOfValidUpdatedSegments, List<Segment> listOfInvalidUpdatedSegments, |
| List<Segment> listOfStreamSegments, List<Segment> listOfInProgressSegments) { |
| this.listOfValidSegments = listOfValidSegments; |
| this.listOfValidUpdatedSegments = listOfValidUpdatedSegments; |
| this.listOfInvalidSegments = listOfInvalidUpdatedSegments; |
| this.listOfStreamSegments = listOfStreamSegments; |
| this.listOfInProgressSegments = listOfInProgressSegments; |
| } |
| |
| public List<Segment> getInvalidSegments() { |
| return listOfInvalidSegments; |
| } |
| |
| public List<Segment> getValidSegments() { |
| return listOfValidSegments; |
| } |
| |
| public List<Segment> getStreamSegments() { |
| return listOfStreamSegments; |
| } |
| |
| public List<Segment> getListOfInProgressSegments() { |
| return listOfInProgressSegments; |
| } |
| } |
| |
| /** |
| * Return true if any load or insert overwrite is in progress for specified table |
| */ |
| public static Boolean isLoadInProgressInTable(CarbonTable carbonTable) { |
| if (carbonTable == null) { |
| return false; |
| } |
| boolean loadInProgress = false; |
| String metaPath = carbonTable.getMetadataPath(); |
| LoadMetadataDetails[] listOfLoadFolderDetailsArray = |
| SegmentStatusManager.readLoadMetadata(metaPath); |
| if (listOfLoadFolderDetailsArray.length != 0) { |
| for (LoadMetadataDetails loaddetail :listOfLoadFolderDetailsArray) { |
| SegmentStatus segmentStatus = loaddetail.getSegmentStatus(); |
| if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS |
| || segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { |
| loadInProgress = |
| isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(), |
| loaddetail.getLoadName()); |
| } |
| } |
| } |
| return loadInProgress; |
| } |
| |
| /** |
| * Return true if the compaction is in progress for the table |
| * @param carbonTable |
| * @return |
| */ |
| public static Boolean isCompactionInProgress(CarbonTable carbonTable) { |
| if (carbonTable == null) { |
| return false; |
| } |
| boolean compactionInProgress; |
| ICarbonLock lock = CarbonLockFactory |
| .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.COMPACTION_LOCK); |
| try { |
| compactionInProgress = !lock.lockWithRetries(1, 0); |
| } finally { |
| lock.unlock(); |
| } |
| return compactionInProgress; |
| } |
| |
| /** |
| * Return true if insert overwrite is in progress for specified table |
| */ |
| public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) { |
| if (carbonTable == null) { |
| return false; |
| } |
| boolean loadInProgress = false; |
| String metaPath = carbonTable.getMetadataPath(); |
| LoadMetadataDetails[] listOfLoadFolderDetailsArray = |
| SegmentStatusManager.readLoadMetadata(metaPath); |
| if (listOfLoadFolderDetailsArray.length != 0) { |
| for (LoadMetadataDetails loaddetail :listOfLoadFolderDetailsArray) { |
| SegmentStatus segmentStatus = loaddetail.getSegmentStatus(); |
| if (segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { |
| loadInProgress = |
| isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(), |
| loaddetail.getLoadName()); |
| } |
| } |
| } |
| return loadInProgress; |
| } |
| |
| /** |
| * Return true if the specified `loadName` is in progress, by checking the load lock. |
| */ |
| public static Boolean isLoadInProgress(AbsoluteTableIdentifier absoluteTableIdentifier, |
| String loadName) { |
| ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, |
| CarbonTablePath.addSegmentPrefix(loadName) + LockUsage.LOCK); |
| try { |
| return !segmentLock.lockWithRetries(1, 0); |
| } finally { |
| segmentLock.unlock(); |
| } |
| } |
| |
| private static boolean isLoadDeletionRequired(LoadMetadataDetails[] details) { |
| if (details != null && details.length > 0) { |
| for (LoadMetadataDetails oneRow : details) { |
| if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus() |
| || SegmentStatus.COMPACTED == oneRow.getSegmentStatus() |
| || SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus() |
| || SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus()) |
| && oneRow.getVisibility().equalsIgnoreCase("true")) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * This will update the old table status details before clean files to the latest table status. |
| * @param oldList |
| * @param newList |
| * @return |
| */ |
| private static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew( |
| LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) { |
| |
| List<LoadMetadataDetails> newListMetadata = |
| new ArrayList<LoadMetadataDetails>(Arrays.asList(newList)); |
| for (LoadMetadataDetails oldSegment : oldList) { |
| if ("false".equalsIgnoreCase(oldSegment.getVisibility())) { |
| newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false"); |
| } |
| } |
| return newListMetadata; |
| } |
| |
| private static void writeLoadMetadata(AbsoluteTableIdentifier identifier, |
| List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { |
| String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); |
| |
| DataOutputStream dataOutputStream; |
| Gson gsonObjectToWrite = new Gson(); |
| BufferedWriter brWriter = null; |
| |
| AtomicFileOperations writeOperation = |
| AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation); |
| |
| try { |
| |
| dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); |
| brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, |
| Charset.forName(DEFAULT_CHARSET))); |
| |
| // make the table status file smaller by removing fields that are default value |
| listOfLoadFolderDetails.forEach(LoadMetadataDetails::removeUnnecessaryField); |
| |
| String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); |
| brWriter.write(metadataInstance); |
| } catch (IOException ie) { |
| LOG.error("Error message: " + ie.getLocalizedMessage()); |
| writeOperation.setFailed(); |
| throw ie; |
| } finally { |
| try { |
| if (null != brWriter) { |
| brWriter.flush(); |
| } |
| } catch (Exception e) { |
| LOG.error("error in flushing "); |
| |
| } |
| CarbonUtil.closeStreams(brWriter); |
| writeOperation.close(); |
| } |
| } |
| |
| private static class ReturnTuple { |
| LoadMetadataDetails[] details; |
| boolean isUpdateRequired; |
| ReturnTuple(LoadMetadataDetails[] details, boolean isUpdateRequired) { |
| this.details = details; |
| this.isUpdateRequired = isUpdateRequired; |
| } |
| } |
| |
| private static ReturnTuple isUpdationRequired(boolean isForceDeletion, CarbonTable carbonTable, |
| AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] details) { |
| // Delete marked loads |
| boolean isUpdationRequired = DeleteLoadFolders |
| .deleteLoadFoldersFromFileSystem(absoluteTableIdentifier, isForceDeletion, details, |
| carbonTable.getMetadataPath()); |
| return new ReturnTuple(details, isUpdationRequired); |
| } |
| |
| public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean isForceDeletion, |
| List<PartitionSpec> partitionSpecs) throws IOException { |
| LoadMetadataDetails[] metadataDetails = |
| SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); |
| // delete the expired segment lock files |
| CarbonLockUtil.deleteExpiredSegmentLockFiles(carbonTable); |
| if (isLoadDeletionRequired(metadataDetails)) { |
| AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); |
| boolean updationCompletionStatus = false; |
| LoadMetadataDetails[] newAddedLoadHistoryList = null; |
| ReturnTuple tuple = |
| isUpdationRequired(isForceDeletion, carbonTable, identifier, metadataDetails); |
| if (tuple.isUpdateRequired) { |
| ICarbonLock carbonTableStatusLock = |
| CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK); |
| boolean locked = false; |
| try { |
| // Update load metadate file after cleaning deleted nodes |
| locked = carbonTableStatusLock.lockWithRetries(); |
| if (locked) { |
| LOG.info("Table status lock has been successfully acquired."); |
| // Again read status and check to verify updation required or not. |
| LoadMetadataDetails[] details = |
| SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); |
| ReturnTuple tuple2 = |
| isUpdationRequired(isForceDeletion, carbonTable, identifier, details); |
| if (!tuple2.isUpdateRequired) { |
| return; |
| } |
| // read latest table status again. |
| LoadMetadataDetails[] latestMetadata = |
| SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); |
| |
| int invisibleSegmentPreserveCnt = |
| CarbonProperties.getInstance().getInvisibleSegmentPreserveCount(); |
| int maxSegmentId = SegmentStatusManager.getMaxSegmentId(tuple2.details); |
| int invisibleSegmentCnt = SegmentStatusManager.countInvisibleSegments( |
| tuple2.details, maxSegmentId); |
| // if execute command 'clean files' or the number of invisible segment info |
| // exceeds the value of 'carbon.invisible.segments.preserve.count', |
| // it need to append the invisible segment list to 'tablestatus.history' file. |
| if (isForceDeletion || (invisibleSegmentCnt > invisibleSegmentPreserveCnt)) { |
| TableStatusReturnTuple tableStatusReturn = separateVisibleAndInvisibleSegments( |
| tuple2.details, latestMetadata, invisibleSegmentCnt, maxSegmentId); |
| LoadMetadataDetails[] oldLoadHistoryList = readLoadHistoryMetadata( |
| carbonTable.getMetadataPath()); |
| LoadMetadataDetails[] newLoadHistoryList = appendLoadHistoryList( |
| oldLoadHistoryList, tableStatusReturn.arrayOfLoadHistoryDetails); |
| writeLoadDetailsIntoFile( |
| CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()), |
| tableStatusReturn.arrayOfLoadDetails); |
| writeLoadDetailsIntoFile( |
| CarbonTablePath.getTableStatusHistoryFilePath(carbonTable.getTablePath()), |
| newLoadHistoryList); |
| // the segments which will be moved to history file need to be deleted |
| newAddedLoadHistoryList = tableStatusReturn.arrayOfLoadHistoryDetails; |
| } else { |
| // update the metadata details from old to new status. |
| List<LoadMetadataDetails> latestStatus = |
| updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata); |
| writeLoadDetailsIntoFile( |
| CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), |
| latestStatus.toArray(new LoadMetadataDetails[0])); |
| } |
| updationCompletionStatus = true; |
| } else { |
| String dbName = identifier.getCarbonTableIdentifier().getDatabaseName(); |
| String tableName = identifier.getCarbonTableIdentifier().getTableName(); |
| String errorMsg = "Clean files request is failed for " + |
| dbName + "." + tableName + |
| ". Not able to acquire the table status lock due to other operation " + |
| "running in the background."; |
| LOG.error(errorMsg); |
| throw new IOException(errorMsg + " Please try after some time."); |
| } |
| } finally { |
| if (locked) { |
| CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); |
| } |
| if (updationCompletionStatus) { |
| DeleteLoadFolders |
| .physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList, |
| isForceDeletion, partitionSpecs); |
| } |
| } |
| } |
| } |
| } |
| |
| public static void truncateTable(CarbonTable carbonTable) |
| throws ConcurrentOperationException, IOException { |
| ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj( |
| carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); |
| boolean locked = false; |
| try { |
| // Update load metadate file after cleaning deleted nodes |
| locked = carbonTableStatusLock.lockWithRetries(); |
| if (locked) { |
| LOG.info("Table status lock has been successfully acquired."); |
| LoadMetadataDetails[] listOfLoadFolderDetailsArray = |
| SegmentStatusManager.readLoadMetadata( |
| CarbonTablePath.getMetadataPath(carbonTable.getTablePath())); |
| for (LoadMetadataDetails listOfLoadFolderDetails : listOfLoadFolderDetailsArray) { |
| boolean writing; |
| switch (listOfLoadFolderDetails.getSegmentStatus()) { |
| case INSERT_IN_PROGRESS: |
| case INSERT_OVERWRITE_IN_PROGRESS: |
| case STREAMING: |
| writing = true; |
| break; |
| default: |
| writing = false; |
| } |
| if (writing) { |
| throw new ConcurrentOperationException(carbonTable, "insert", "truncate"); |
| } |
| } |
| for (LoadMetadataDetails listOfLoadFolderDetails : listOfLoadFolderDetailsArray) { |
| listOfLoadFolderDetails.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); |
| } |
| SegmentStatusManager |
| .writeLoadDetailsIntoFile( |
| CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()), |
| listOfLoadFolderDetailsArray); |
| } else { |
| String dbName = carbonTable.getDatabaseName(); |
| String tableName = carbonTable.getTableName(); |
| String errorMsg = "truncate table request is failed for " + |
| dbName + "." + tableName + |
| ". Not able to acquire the table status lock due to other operation " + |
| "running in the background."; |
| LOG.error(errorMsg); |
| throw new IOException(errorMsg + " Please try after some time."); |
| } |
| } finally { |
| if (locked) { |
| CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); |
| } |
| } |
| } |
| |
| /** |
| * Get the number of invisible segment info from segment info list. |
| */ |
| public static int countInvisibleSegments( |
| LoadMetadataDetails[] segmentList, int maxSegmentId) { |
| int invisibleSegmentCnt = 0; |
| if (segmentList.length != 0) { |
| for (LoadMetadataDetails eachSeg : segmentList) { |
| // can not remove segment 0, there are some info will be used later |
| // for example: updateStatusFileName |
| // also can not remove the max segment id, |
| // otherwise will impact the generation of segment id |
| if (!eachSeg.getLoadName().equalsIgnoreCase("0") |
| && !eachSeg.getLoadName().equalsIgnoreCase(String.valueOf(maxSegmentId)) |
| && eachSeg.getVisibility().equalsIgnoreCase("false")) { |
| invisibleSegmentCnt += 1; |
| } |
| } |
| } |
| return invisibleSegmentCnt; |
| } |
| |
| private static class TableStatusReturnTuple { |
| LoadMetadataDetails[] arrayOfLoadDetails; |
| LoadMetadataDetails[] arrayOfLoadHistoryDetails; |
| TableStatusReturnTuple(LoadMetadataDetails[] arrayOfLoadDetails, |
| LoadMetadataDetails[] arrayOfLoadHistoryDetails) { |
| this.arrayOfLoadDetails = arrayOfLoadDetails; |
| this.arrayOfLoadHistoryDetails = arrayOfLoadHistoryDetails; |
| } |
| } |
| |
| /** |
| * Separate visible and invisible segments into two array. |
| */ |
| public static TableStatusReturnTuple separateVisibleAndInvisibleSegments( |
| LoadMetadataDetails[] oldList, |
| LoadMetadataDetails[] newList, |
| int invisibleSegmentCnt, |
| int maxSegmentId) { |
| int newSegmentsLength = newList.length; |
| int visibleSegmentCnt = newSegmentsLength - invisibleSegmentCnt; |
| LoadMetadataDetails[] arrayOfVisibleSegments = new LoadMetadataDetails[visibleSegmentCnt]; |
| LoadMetadataDetails[] arrayOfInvisibleSegments = new LoadMetadataDetails[invisibleSegmentCnt]; |
| int oldSegmentsLength = oldList.length; |
| int visibleIdx = 0; |
| int invisibleIdx = 0; |
| for (int i = 0; i < newSegmentsLength; i++) { |
| LoadMetadataDetails newSegment = newList[i]; |
| if (i < oldSegmentsLength) { |
| LoadMetadataDetails oldSegment = oldList[i]; |
| if (newSegment.getLoadName().equalsIgnoreCase("0") |
| || newSegment.getLoadName().equalsIgnoreCase(String.valueOf(maxSegmentId))) { |
| newSegment.setVisibility(oldSegment.getVisibility()); |
| arrayOfVisibleSegments[visibleIdx] = newSegment; |
| visibleIdx++; |
| } else if ("false".equalsIgnoreCase(oldSegment.getVisibility())) { |
| newSegment.setVisibility("false"); |
| arrayOfInvisibleSegments[invisibleIdx] = newSegment; |
| invisibleIdx++; |
| } else { |
| arrayOfVisibleSegments[visibleIdx] = newSegment; |
| visibleIdx++; |
| } |
| } else { |
| arrayOfVisibleSegments[visibleIdx] = newSegment; |
| visibleIdx++; |
| } |
| } |
| return new TableStatusReturnTuple(arrayOfVisibleSegments, arrayOfInvisibleSegments); |
| } |
| |
| /** |
| * Return an array containing all invisible segment entries in appendList and historyList. |
| */ |
| public static LoadMetadataDetails[] appendLoadHistoryList( |
| LoadMetadataDetails[] historyList, |
| LoadMetadataDetails[] appendList) { |
| int historyListLen = historyList.length; |
| int appendListLen = appendList.length; |
| int newListLen = historyListLen + appendListLen; |
| LoadMetadataDetails[] newList = new LoadMetadataDetails[newListLen]; |
| int newListIdx = 0; |
| for (int i = 0; i < historyListLen; i++) { |
| newList[newListIdx] = historyList[i]; |
| newListIdx++; |
| } |
| for (int i = 0; i < appendListLen; i++) { |
| newList[newListIdx] = appendList[i]; |
| newListIdx++; |
| } |
| return newList; |
| } |
| |
| /* |
| * This method reads the load metadata file and returns Carbon segments only |
| */ |
| public static LoadMetadataDetails[] readCarbonMetaData(String metadataFolderPath) { |
| String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR |
| + CarbonTablePath.TABLE_STATUS_FILE; |
| try { |
| LoadMetadataDetails[] allSegments = readTableStatusFile(metadataFileName); |
| List<LoadMetadataDetails> carbonSegments = new ArrayList<>(); |
| for (LoadMetadataDetails currSegment : allSegments) { |
| if (currSegment.isCarbonFormat()) { |
| carbonSegments.add(currSegment); |
| } |
| } |
| return carbonSegments.toArray(new LoadMetadataDetails[carbonSegments.size()]); |
| } catch (IOException e) { |
| return new LoadMetadataDetails[0]; |
| } |
| } |
| } |