blob: 1782cdffa3b749910f03eb8f10e580a10d830d58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
import java.io.{File, IOException}
import java.text.SimpleDateFormat
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.execution.command.{AlterPartitionModel, DataMapField, Field, PartitionerField}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo}
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.util.CommonUtil
object PartitionUtils {
def getListInfo(originListInfo: String): List[List[String]] = {
var listInfo = ListBuffer[List[String]]()
var templist = ListBuffer[String]()
val arr = originListInfo.split(",")
.map(_.trim())
var groupEnd = true
val iter = arr.iterator
while (iter.hasNext) {
val value = iter.next()
if (value.startsWith("(")) {
templist += value.replace("(", "").trim()
groupEnd = false
} else if (value.endsWith(")")) {
templist += value.replace(")", "").trim()
listInfo += templist.toList
templist.clear()
groupEnd = true
} else {
if (groupEnd) {
templist += value
listInfo += templist.toList
templist.clear()
} else {
templist += value
}
}
}
listInfo.toList
}
/**
* verify the add/split information and update the partitionInfo:
* 1. update rangeInfo/listInfo
* 2. update partitionIds
*/
def updatePartitionInfo(partitionInfo: PartitionInfo, partitionIdList: List[Int],
partitionId: Int, splitInfo: List[String], timestampFormatter: SimpleDateFormat,
dateFormatter: SimpleDateFormat): Unit = {
val columnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
val index = partitionIdList.indexOf(partitionId)
if (index < 0) {
throw new IllegalArgumentException("Invalid Partition Id " + partitionId +
"\n Use show partitions table_name to get the list of valid partitions")
}
if (partitionInfo.getPartitionType == PartitionType.RANGE) {
val rangeInfo = partitionInfo.getRangeInfo.asScala.toList
val newRangeInfo = partitionId match {
case 0 => rangeInfo ++ splitInfo
case _ => rangeInfo.take(index - 1) ++ splitInfo ++
rangeInfo.takeRight(rangeInfo.size - index)
}
CommonUtil.validateRangeInfo(newRangeInfo, columnDataType,
timestampFormatter, dateFormatter)
partitionInfo.setRangeInfo(newRangeInfo.asJava)
} else if (partitionInfo.getPartitionType == PartitionType.LIST) {
val originList = partitionInfo.getListInfo.asScala.map(_.asScala.toList).toList
if (partitionId != 0) {
val targetListInfo = partitionInfo.getListInfo.get(index - 1)
CommonUtil.validateSplitListInfo(targetListInfo.asScala.toList, splitInfo, originList)
} else {
CommonUtil.validateAddListInfo(splitInfo, originList)
}
val addListInfo = PartitionUtils.getListInfo(splitInfo.mkString(","))
val newListInfo = partitionId match {
case 0 => originList ++ addListInfo
case _ => originList.take(index - 1) ++ addListInfo ++
originList.takeRight(originList.size - index)
}
partitionInfo.setListInfo(newListInfo.map(_.asJava).asJava)
}
if (partitionId == 0) {
partitionInfo.addPartition(splitInfo.size)
} else {
partitionInfo.splitPartition(index, splitInfo.size)
}
}
/**
* Used for alter table partition commands to get segmentProperties in spark node
* @param identifier
* @param segmentId
* @param oldPartitionIdList Task id group before partition info is changed
* @return
*/
def getSegmentProperties(identifier: AbsoluteTableIdentifier, segmentId: String,
partitionIds: List[String], oldPartitionIdList: List[Int],
partitionInfo: PartitionInfo,
carbonTable: CarbonTable): SegmentProperties = {
val tableBlockInfoList =
getPartitionBlockList(
identifier,
segmentId,
partitionIds,
oldPartitionIdList,
partitionInfo,
carbonTable)
val footer = CarbonUtil.readMetadataFile(tableBlockInfoList.get(0))
val segmentProperties = new SegmentProperties(footer.getColumnInTable,
footer.getSegmentInfo.getColumnCardinality)
segmentProperties
}
def getPartitionBlockList(identifier: AbsoluteTableIdentifier, segmentId: String,
partitionIds: List[String], oldPartitionIdList: List[Int],
partitionInfo: PartitionInfo,
carbonTable: CarbonTable): java.util.List[TableBlockInfo] = {
val jobConf = new JobConf(FileFactory.getConfiguration)
val job = new Job(jobConf)
val format = CarbonInputFormatUtil
.createCarbonTableInputFormat(identifier, partitionIds.asJava, job)
CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
val splits = format.getSplitsOfOneSegment(job, segmentId,
oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo)
val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
val tableBlockInfoList = CarbonInputSplit.createBlocks(blockList.asJava)
tableBlockInfoList
}
@throws(classOf[IOException])
def deleteOriginalCarbonFile(alterPartitionModel: AlterPartitionModel,
identifier: AbsoluteTableIdentifier,
partitionIds: List[String], dbName: String, tableName: String,
partitionInfo: PartitionInfo): Unit = {
val carbonLoadModel = alterPartitionModel.carbonLoadModel
val segmentId = alterPartitionModel.segmentId
val oldPartitionIds = alterPartitionModel.oldPartitionIds
val newTime = carbonLoadModel.getFactTimeStamp
val tablePath = carbonLoadModel.getTablePath
val tableBlockInfoList =
getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIds,
partitionInfo, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable).asScala
val pathList: util.List[String] = new util.ArrayList[String]()
tableBlockInfoList.foreach{ tableBlockInfo =>
val path = tableBlockInfo.getFilePath
val timestamp = CarbonTablePath.DataFileUtil.getTimeStampFromFileName(path)
if (timestamp.toLong != newTime) {
// add carbondata file
pathList.add(path)
// add index file
val version = tableBlockInfo.getVersion
val taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path)
val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
val indexFilePath =
new Path(new Path(path).getParent,
CarbonTablePath.getCarbonIndexFileName(taskId,
bucketNumber.toInt,
batchNo,
timestamp,
segmentId)).toString
// indexFilePath could be duplicated when multiple data file related to one index file
if (indexFilePath != null && !pathList.contains(indexFilePath)) {
pathList.add(indexFilePath)
}
}
}
val files: util.List[File] = new util.ArrayList[File]()
for (path <- pathList.asScala) {
val file = new File(path)
files.add(file)
}
CarbonUtil.deleteFiles(files.asScala.toArray)
if (!files.isEmpty) {
val carbonTable = alterPartitionModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val updatedSegFile: String = mergeAndUpdateSegmentFile(alterPartitionModel,
identifier,
segmentId,
carbonTable,
files.asScala)
val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, updatedSegFile, null))
.asJava
if (!CarbonUpdateUtil.updateTableMetadataStatus(
new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId,
null, null)).asJava),
carbonTable,
alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString,
true,
new util.ArrayList[Segment](0),
new util.ArrayList[Segment](segmentFiles), "")) {
throw new IOException("Data update failed due to failure in table status updation.")
}
}
}
/**
* Used to extract PartitionerFields for aggregation datamaps.
* This method will keep generating partitionerFields until the sequence of
* partition column is broken.
*
* For example: if x,y,z are partition columns in main table then child tables will be
* partitioned only if the child table has List("x,y,z", "x,y", "x") as the projection columns.
*
*
*/
def getPartitionerFields(allPartitionColumn: Seq[String],
fieldRelations: mutable.LinkedHashMap[Field, DataMapField]): Seq[PartitionerField] = {
def generatePartitionerField(partitionColumn: List[String],
partitionerFields: Seq[PartitionerField]): Seq[PartitionerField] = {
partitionColumn match {
case head :: tail =>
// Collect the first relation which matched the condition
val validRelation = fieldRelations.zipWithIndex.collectFirst {
case ((field, dataMapField), index) if
dataMapField.columnTableRelationList.getOrElse(Seq()).nonEmpty &&
head.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
dataMapField.aggregateFunction.isEmpty =>
(PartitionerField(field.name.get,
field.dataType,
field.columnComment), allPartitionColumn.indexOf(head))
}
if (validRelation.isDefined) {
val (partitionerField, index) = validRelation.get
// if relation is found then check if the partitionerFields already found are equal
// to the index of this element.
// If x with index 1 is found then there should be exactly 1 element already found.
// If z with index 2 comes directly after x then this check will be false are 1
// element is skipped in between and index would be 2 and number of elements found
// would be 1. In that case return empty sequence so that the aggregate table is not
// partitioned on any column.
if (index == partitionerFields.length) {
generatePartitionerField(tail, partitionerFields :+ partitionerField)
} else {
Seq.empty
}
} else {
// if not found then countinue search for the rest of the elements. Because the rest
// of the elements can also decide if the table has to be partitioned or not.
generatePartitionerField(tail, partitionerFields)
}
case Nil =>
// if end of list then return fields.
partitionerFields
}
}
generatePartitionerField(allPartitionColumn.toList, Seq.empty)
}
private def mergeAndUpdateSegmentFile(alterPartitionModel: AlterPartitionModel,
identifier: AbsoluteTableIdentifier,
segmentId: String,
carbonTable: CarbonTable, filesToBeDelete: Seq[File]) = {
val metadataDetails =
SegmentStatusManager.readTableStatusFile(
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
val segmentFile =
metadataDetails.find(_.getLoadName.equals(segmentId)).get.getSegmentFile
var allSegmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
val file = SegmentFileStore.writeSegmentFile(
carbonTable,
alterPartitionModel.segmentId,
System.currentTimeMillis().toString)
if (segmentFile != null) {
allSegmentFiles ++= FileFactory.getCarbonFile(
SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil
}
val updatedSegFile = {
val carbonFile = FileFactory.getCarbonFile(
SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file))
allSegmentFiles ++= carbonFile :: Nil
val mergedSegFileName = SegmentFileStore.genSegmentFileName(
segmentId,
alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
val tmpFile = mergedSegFileName + "_tmp"
val segmentStoreFile = SegmentFileStore.mergeSegmentFiles(
tmpFile,
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
allSegmentFiles.toArray)
val indexFiles = segmentStoreFile.getLocationMap.values().asScala.head.getFiles
filesToBeDelete.foreach(f => indexFiles.remove(f.getName))
SegmentFileStore.writeSegmentFile(
segmentStoreFile,
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) +
CarbonCommonConstants.FILE_SEPARATOR + mergedSegFileName + CarbonTablePath.SEGMENT_EXT)
carbonFile.delete()
FileFactory.getCarbonFile(
SegmentFileStore.getSegmentFilePath(
carbonTable.getTablePath, tmpFile + CarbonTablePath.SEGMENT_EXT)).delete()
mergedSegFileName + CarbonTablePath.SEGMENT_EXT
}
updatedSegFile
}
}