blob: 316781a7b4ba2c8ace42246f4f2b008e5e183df5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.util
import scala.collection.JavaConverters._
import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.util.CarbonLoaderUtil
/**
* Utility Class for the Secondary Index creation flow
*/
object FileInternalUtil {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
/**
* This method will check and create an empty schema timestamp file
*
* @return
*/
def touchStoreTimeStamp(): Long = {
val timestampFile = getTimestampFileAndType()
val systemTime = System.currentTimeMillis()
FileFactory.getCarbonFile(timestampFile)
.setLastModifiedTime(systemTime)
systemTime
}
private def getTimestampFileAndType() = {
// if mdt file path is configured then take configured path else take default path
val configuredMdtPath = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim
val timestampFile = configuredMdtPath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
CarbonUtil.checkAndAppendFileSystemURIScheme(timestampFile)
}
def updateTableStatus(
validSegments: List[String],
databaseName: String,
tableName: String,
loadStatus: SegmentStatus,
segmentIdToLoadStartTimeMapping: scala.collection.mutable.Map[String, java.lang.Long],
segmentToSegmentTimestampMap: java.util.Map[String, String],
carbonTable: CarbonTable,
sparkSession: SparkSession,
newStartTime: Long = 0L,
rebuiltSegments: Set[String] = Set.empty): Boolean = {
var loadMetadataDetailsList = Array[LoadMetadataDetails]()
val loadEndTime = CarbonUpdateUtil.readCurrentTime
validSegments.foreach { segmentId =>
val loadMetadataDetail = new LoadMetadataDetails
loadMetadataDetail.setLoadName(segmentId)
loadMetadataDetail.setSegmentStatus(loadStatus)
if (rebuiltSegments.contains(loadMetadataDetail.getLoadName) && newStartTime != 0L) {
loadMetadataDetail.setLoadStartTime(newStartTime)
} else {
loadMetadataDetail.setLoadStartTime(segmentIdToLoadStartTimeMapping(segmentId))
}
loadMetadataDetail.setLoadEndTime(loadEndTime)
if (null != segmentToSegmentTimestampMap.get(segmentId)) {
loadMetadataDetail
.setSegmentFile(SegmentFileStore
.genSegmentFileName(segmentId,
segmentToSegmentTimestampMap.get(segmentId).toString) +
CarbonTablePath.SEGMENT_EXT)
} else {
loadMetadataDetail
.setSegmentFile(SegmentFileStore
.genSegmentFileName(segmentId,
segmentIdToLoadStartTimeMapping(segmentId).toString) +
CarbonTablePath.SEGMENT_EXT)
}
CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(loadMetadataDetail, segmentId, carbonTable)
loadMetadataDetailsList +:= loadMetadataDetail
}
val indexTables = CarbonInternalScalaUtil
.getIndexCarbonTables(carbonTable, sparkSession)
val status = CarbonInternalLoaderUtil.recordLoadMetadata(
loadMetadataDetailsList.toList.asJava,
validSegments.asJava,
carbonTable,
indexTables.toList.asJava,
databaseName,
tableName
)
status
}
def touchSchemaFileTimestamp(dbName: String,
tableName: String,
tablePath: String,
schemaTimeStamp: Long): Unit = {
val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
if (FileFactory.isFileExist(tableMetadataFile)) {
FileFactory.getCarbonFile(tableMetadataFile)
.setLastModifiedTime(schemaTimeStamp)
}
}
}