blob: 5c0b0511b7d2051983034db2c6fa2cda870c0ec6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.query.runtime.plan
import java.util.concurrent.ConcurrentHashMap
import java.{lang, util}
import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
import org.apache.kylin.engine.spark.utils.{LogEx, LogUtils}
import org.apache.kylin.guava30.shaded.common.base.Joiner
import org.apache.kylin.guava30.shaded.common.collect.{Lists, Sets}
import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate
import org.apache.kylin.metadata.cube.gridtable.NLayoutToGridTableMapping
import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, NDataflow}
import org.apache.kylin.metadata.model.{DeriveInfo, FunctionDesc, NTableMetadataManager, ParameterDesc, TblColRef}
import org.apache.kylin.metadata.realization.HybridRealization
import org.apache.kylin.metadata.tuple.TupleInfo
import org.apache.kylin.query.implicits.sessionToQueryContext
import org.apache.kylin.query.relnode.{KapRel, OLAPContext}
import org.apache.kylin.query.util.{RuntimeHelper, SparderDerivedUtil}
import org.apache.spark.sql.{Column, DataFrame, Row, SparderEnv, SparkOperation, SparkSession}
import org.apache.spark.sql.execution.utils.SchemaProcessor
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.manager.SparderLookupManager
import org.apache.spark.sql.types.{ArrayType, DataTypes, DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.util.SparderTypeUtil
import scala.collection.JavaConverters._
// scalastyle:off
object TableScanPlan extends LogEx {
def listSegmentsForQuery(dataflow: NDataflow): util.List[NDataSegment] = {
val r = new util.ArrayList[NDataSegment]
dataflow.getQueryableSegments.forEach(seg => r.add(seg))
r
}
private[runtime] val cacheDf: ThreadLocal[ConcurrentHashMap[String, DataFrame]] = new ThreadLocal[ConcurrentHashMap[String, DataFrame]] {
override def initialValue: ConcurrentHashMap[String, DataFrame] = {
new ConcurrentHashMap[String, DataFrame]
}
}
def createOLAPTable(rel: KapRel): DataFrame = logTime("table scan", debug = true) {
val session: SparkSession = SparderEnv.getSparkSession
val olapContext = rel.getContext
val context = olapContext.storageContext
val prunedSegments = context.getPrunedSegments
val prunedStreamingSegments = context.getPrunedStreamingSegments
val realizations = olapContext.realization.getRealizations.asScala.toList
realizations.map(_.asInstanceOf[NDataflow])
.filter(dataflow => (!dataflow.isStreaming && !context.isBatchCandidateEmpty) ||
(dataflow.isStreaming && !context.isStreamCandidateEmpty) ||
isSegmentsEmpty(prunedSegments, prunedStreamingSegments))
.map(dataflow => {
if (dataflow.isStreaming) {
tableScan(rel, dataflow, olapContext, session, prunedStreamingSegments, context.getStreamingCandidate)
} else {
tableScan(rel, dataflow, olapContext, session, prunedSegments, context.getCandidate)
}
}).reduce(_.union(_))
}
def createMetadataTable(rel: KapRel): DataFrame = {
val session: SparkSession = SparderEnv.getSparkSession
val olapContext = rel.getContext
val allFields: util.List[TblColRef] = new util.ArrayList[TblColRef]
olapContext.allTableScans.forEach(tableScan => {
val columns = tableScan.getColumnRowType.getAllColumns
allFields.addAll(columns)
})
// convert data
val dataSet = olapContext.getColValuesRange
val result = new util.ArrayList[Row]
dataSet.forEach(rowData => {
val sparderData = new Array[Any](rowData.length)
sparderData.indices.foreach(index => {
val dataType = allFields.get(index).getColumnDesc.getUpgradedType
sparderData(index) = SparderTypeUtil.convertStringToResultValueBasedOnKylinSQLType(rowData.apply(index), dataType)
})
result.add(Row.fromSeq(sparderData))
})
// create schema
val structTypes = new util.ArrayList[StructField]()
allFields.forEach(col => {
try {
val dataType = col.getColumnDesc.getUpgradedType
val spaType = if (dataType.isDate) {
DataTypes.DateType
} else if (dataType.isDateTimeFamily) {
DataTypes.TimestampType
} else {
SparderTypeUtil.kylinTypeToSparkResultType(dataType)
}
structTypes.add(StructField(col.getIdentity.replace(".", "_"), spaType))
} catch {
// some dataTypes are not support in sparder, such as 'any',
// but these types can not be used in min/max, just return stringType
case e: IllegalArgumentException => {
structTypes.add(StructField(col.getIdentity.replace(".", "_"), StringType))
logInfo(e.toString)
}
}
})
val schema: StructType = StructType(structTypes)
session.createDataFrame(result, schema)
}
// prunedSegments is null
private def tableScanEmptySegment(rel: KapRel): DataFrame = {
logInfo("prunedSegments is null")
val df = SparkOperation.createEmptyDataFrame(
StructType(rel.getColumnRowType
.getAllColumns.asScala
.map(column => StructField(
column.toString.replaceAll("\\.", "_"),
SparderTypeUtil.toSparkType(column.getType))
)
)
)
val cols = df.schema.map(structField => {
col(structField.name)
})
df.select(cols: _*)
}
def isSegmentsEmpty(prunedSegments: util.List[NDataSegment], prunedStreamingSegments: util.List[NDataSegment]): Boolean = {
val isPrunedSegmentsEmpty = prunedSegments == null || prunedSegments.size() == 0
val isPrunedStreamingSegmentsEmpty = prunedStreamingSegments == null || prunedStreamingSegments.size() == 0
isPrunedSegmentsEmpty && isPrunedStreamingSegmentsEmpty
}
def tableScan(rel: KapRel, dataflow: NDataflow, olapContext: OLAPContext,
session: SparkSession, prunedSegments: util.List[NDataSegment],
candidate: NLayoutCandidate): DataFrame = {
val prunedPartitionMap = olapContext.storageContext.getPrunedPartitions
olapContext.resetSQLDigest()
//TODO: refactor
val cuboidLayout = candidate.getLayoutEntity
if (cuboidLayout.getIndex != null && cuboidLayout.getIndex.isTableIndex) {
QueryContext.current().getQueryTagInfo.setTableIndex(true)
}
val tableName = olapContext.firstTableScan.getBackupAlias
val mapping = new NLayoutToGridTableMapping(cuboidLayout)
val columnNames = SchemaProcessor.buildGTSchema(cuboidLayout, mapping, tableName)
/////////////////////////////////////////////
val kapConfig = KapConfig.wrap(dataflow.getConfig)
val basePath = kapConfig.getReadParquetStoragePath(dataflow.getProject)
if (prunedSegments == null || prunedSegments.size() == 0) {
return tableScanEmptySegment(rel: KapRel)
}
val fileList = prunedSegments.asScala.map(
seg => toLayoutPath(dataflow, cuboidLayout.getId, basePath, seg, prunedPartitionMap)
)
val path = fileList.mkString(",") + olapContext.isExactlyFastBitmap
printLogInfo(basePath, dataflow.getId, cuboidLayout.getId, prunedSegments, prunedPartitionMap)
val cached = cacheDf.get().getOrDefault(path, null)
val df = if (cached != null && !cached.sparkSession.sparkContext.isStopped) {
logInfo(s"Reuse df: ${cuboidLayout.getId}")
cached
} else {
val pruningInfo = prunedSegments.asScala.map { seg =>
if (prunedPartitionMap != null) {
val partitions = prunedPartitionMap.get(seg.getId)
seg.getId + ":" + Joiner.on("|").join(partitions)
} else {
seg.getId
}
}.mkString(",")
val newDf = session.kylin
.isFastBitmapEnabled(olapContext.isExactlyFastBitmap)
.bucketingEnabled(bucketEnabled(olapContext, cuboidLayout))
.cuboidTable(dataflow, cuboidLayout, pruningInfo)
.toDF(columnNames: _*)
logInfo(s"Cache df: ${cuboidLayout.getId}")
cacheDf.get().put(path, newDf)
newDf
}
val (schema, newDF) = buildSchema(df, tableName, cuboidLayout, rel, olapContext, dataflow)
newDF.select(schema: _*)
}
def bucketEnabled(context: OLAPContext, layout: LayoutEntity): Boolean = {
if (!KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled) {
return false
}
// no extra agg is allowed
if (context.isHasAgg && !context.isExactlyAggregate) {
return false
}
// check if outer join key matches shard by key
(context.getOuterJoinParticipants.size() == 1
&& layout.getShardByColumnRefs.size() == 1
&& context.getOuterJoinParticipants.iterator().next() == layout.getShardByColumnRefs.get(0))
}
def buildSchema(df: DataFrame, tableName: String, cuboidLayout: LayoutEntity, rel: KapRel,
olapContext: OLAPContext, dataflow: NDataflow): (Seq[Column], DataFrame) = {
var newDF = df
val isBatchOfHybrid = olapContext.realization.isInstanceOf[HybridRealization] && dataflow.getModel.isFusionModel && !dataflow.isStreaming
val mapping = new NLayoutToGridTableMapping(cuboidLayout, isBatchOfHybrid)
val context = olapContext.storageContext
/////////////////////////////////////////////
val groups: util.Collection[TblColRef] =
olapContext.getSQLDigest.groupbyColumns
val otherDims = Sets.newHashSet(context.getDimensions)
otherDims.removeAll(groups)
// expand derived (xxxD means contains host columns only, derived columns were translated)
val groupsD = expandDerived(context.getCandidate, groups)
val otherDimsD: util.Set[TblColRef] =
expandDerived(context.getCandidate, otherDims)
otherDimsD.removeAll(groupsD)
// identify cuboid
val dimensionsD = new util.LinkedHashSet[TblColRef]
dimensionsD.addAll(groupsD)
dimensionsD.addAll(otherDimsD)
val model = context.getCandidate.getLayoutEntity.getModel
context.getCandidate.getDerivedToHostMap.asScala.toList.foreach(m => {
if (m._2.`type` == DeriveInfo.DeriveType.LOOKUP && !m._2.isOneToOne) {
m._2.columns.asScala.foreach(derivedId => {
if (mapping.getIndexOf(model.getColRef(derivedId)) != -1) {
dimensionsD.add(model.getColRef(derivedId))
}
})
}
})
val gtColIdx = mapping.getDimIndices(dimensionsD) ++ mapping
.getMetricsIndices(context.getMetrics)
val derived = SparderDerivedUtil(tableName,
dataflow.getLatestReadySegment,
gtColIdx,
olapContext.returnTupleInfo,
context.getCandidate)
if (derived.hasDerived) {
newDF = derived.joinDerived(newDF)
}
var topNMapping: Map[Int, Column] = Map.empty
// query will only has one Top N measure.
val topNMetric = context.getMetrics.asScala.collectFirst {
case x: FunctionDesc if x.getReturnType.startsWith("topn") => x
}
if (topNMetric.isDefined) {
val topNFieldIndex = mapping.getMetricsIndices(List(topNMetric.get).asJava).head
val tp = processTopN(topNMetric.get, newDF, topNFieldIndex, olapContext.returnTupleInfo, tableName)
newDF = tp._1
topNMapping = tp._2
}
val tupleIdx = getTupleIdx(dimensionsD,
context.getMetrics,
olapContext.returnTupleInfo)
(RuntimeHelper.gtSchemaToCalciteSchema(
mapping.getPrimaryKey,
derived,
tableName,
rel.getColumnRowType.getAllColumns.asScala.toList,
df.schema,
gtColIdx,
tupleIdx,
topNMapping), newDF)
}
def toLayoutPath(dataflow: NDataflow, cuboidId: Long, basePath: String, seg: NDataSegment): String = {
s"$basePath${dataflow.getUuid}/${seg.getId}/$cuboidId"
}
def toLayoutPath(dataflow: NDataflow, layoutId: Long, basePath: String,
seg: NDataSegment, partitionsMap: util.Map[String, util.List[lang.Long]]): List[String] = {
if (partitionsMap == null) {
List(toLayoutPath(dataflow, layoutId, basePath, seg))
} else {
partitionsMap.get(seg.getId).asScala.map(part => {
val bucketId = dataflow.getSegment(seg.getId).getBucketId(layoutId, part)
val childDir = if (bucketId == null) part else bucketId
toLayoutPath(dataflow, layoutId, basePath, seg) + "/" + childDir
}).toList
}
}
def printLogInfo(basePath: String, dataflowId: String, cuboidId: Long, prunedSegments: util.List[NDataSegment], partitionsMap: util.Map[String, util.List[lang.Long]]) {
if (partitionsMap == null) {
val segmentIDs = LogUtils.jsonArray(prunedSegments.asScala)(e => s"${e.getId} [${e.getSegRange.getStart}, ${e.getSegRange.getEnd})")
logInfo(s"""Path is: {"base":"$basePath","dataflow":"${dataflowId}","segments":$segmentIDs,"layout": ${cuboidId}""")
} else {
val prunedSegmentInfo = partitionsMap.asScala.map {
case (segmentId, partitionList) => {
"[" + segmentId + ": " + partitionList.asScala.mkString(",") + "]"
}
}.mkString(",")
logInfo(s"""Path is: {"base":"$basePath","dataflow":"${dataflowId}","segments":{$prunedSegmentInfo},"layout": ${cuboidId}""")
}
logInfo(s"size is ${cacheDf.get().size()}")
}
private def processTopN(topNMetric: FunctionDesc, df: DataFrame, topNFieldIndex: Int, tupleInfo: TupleInfo, tableName: String): (DataFrame, Map[Int, Column]) = {
// support TopN measure
val topNField = df.schema.fields
.zipWithIndex
.filter(_._1.dataType.isInstanceOf[ArrayType])
.map(_.swap)
.toMap
.get(topNFieldIndex)
require(topNField.isDefined)
// data like this:
// [2012-01-01,4972.2700,WrappedArray([623.45,[10000392,7,2012-01-01]],[47.49,[10000029,4,2012-01-01]])]
// inline array, one record may output multi records:
// [2012-01-01, 4972.2700, 623.45,[10000392,7,2012-01-01]]
// [2012-01-01, 4972.2700, 47.49,[10000029,4,2012-01-01]]
val inlinedSelectExpr = df.schema.fields.filter(_ != topNField.get).map(_.name) :+ s"inline(${topNField.get.name})"
val inlinedDF = df.selectExpr(inlinedSelectExpr: _*)
// flatten multi dims in TopN measure, will not increase record number, a total flattened struct:
// [2012-01-01, 4972.2700, 623.45, 10000392, 7, 2012-01-01]
val flattenedSelectExpr = inlinedDF.schema.fields.dropRight(1).map(_.name) :+ s"${inlinedDF.schema.fields.last.name}.*"
val flattenedDF = inlinedDF.selectExpr(flattenedSelectExpr: _*)
val topNLiteralColumn = getTopNLiteralColumn(topNMetric)
val literalTupleIdx = topNLiteralColumn.filter(tupleInfo.hasColumn).map(tupleInfo.getColumnIndex)
val numericCol = getTopNNumericColumn(topNMetric)
val numericTupleIdx: Int =
if (numericCol != null) {
// for TopN, the aggr must be SUM
val sumFunc = FunctionDesc.newInstance(FunctionDesc.FUNC_SUM,
Lists.newArrayList(ParameterDesc.newInstance(numericCol)), numericCol.getType.toString)
tupleInfo.getFieldIndex(sumFunc.getRewriteFieldName)
} else {
val countFunction = FunctionDesc.newInstance(FunctionDesc.FUNC_COUNT,
Lists.newArrayList(ParameterDesc.newInstance("1")), "bigint")
tupleInfo.getFieldIndex(countFunction.getRewriteFieldName)
}
val dimCols = topNLiteralColumn.toArray
val dimWithType = literalTupleIdx.zipWithIndex.map(index => {
val column = dimCols(index._2)
(SchemaProcessor.genTopNSchema(tableName,
index._1, column.getIdentity.replaceAll("\\.", "_")),
SparderTypeUtil.toSparkType(column.getType))
})
val sumCol = tupleInfo.getAllColumns.get(numericTupleIdx)
val sumColName = s"A_SUM_${sumCol.getName}_$numericTupleIdx"
val measureSchema = StructField(sumColName, DoubleType)
// flatten schema.
val newSchema = StructType(df.schema.filter(_.name != topNField.get.name)
++ dimWithType.map(tp => StructField(tp._1, tp._2)).+:(measureSchema))
val topNMapping = literalTupleIdx.zipWithIndex.map(index => {
(index._1, col(SchemaProcessor.genTopNSchema(tableName,
index._1, dimCols(index._2).getIdentity.replaceAll("\\.", "_"))))
}).+:(numericTupleIdx, col(sumColName)).toMap
(flattenedDF.toDF(newSchema.fieldNames: _*), topNMapping)
}
private def getTopNLiteralColumn(functionDesc: FunctionDesc): List[TblColRef] = {
val allCols = functionDesc.getColRefs
if (!functionDesc.getParameters.get(0).isColumnType) {
return allCols.asScala.toList
}
allCols.asScala.drop(1).toList
}
private def getTopNNumericColumn(functionDesc: FunctionDesc): TblColRef = {
if (functionDesc.getParameters.get(0).isColumnType) {
return functionDesc.getColRefs.get(0)
}
null
}
// copy from NCubeTupleConverter
def getTupleIdx(
selectedDimensions: util.Set[TblColRef],
selectedMetrics: util.Set[FunctionDesc],
tupleInfo: TupleInfo): Array[Int] = {
var tupleIdx: Array[Int] =
new Array[Int](selectedDimensions.size + selectedMetrics.size)
var i = 0
// pre-calculate dimension index mapping to tuple
selectedDimensions.asScala.foreach(
dim => {
tupleIdx(i) =
if (tupleInfo.hasColumn(dim)) tupleInfo.getColumnIndex(dim) else -1
i += 1
}
)
selectedMetrics.asScala.foreach(
metric => {
if (metric.needRewrite) {
val rewriteFieldName = metric.getRewriteFieldName
tupleIdx(i) =
if (tupleInfo.hasField(rewriteFieldName))
tupleInfo.getFieldIndex(rewriteFieldName)
else -1
} else { // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column
val col = metric.getColRefs.get(0)
tupleIdx(i) =
if (tupleInfo.hasColumn(col)) tupleInfo.getColumnIndex(col) else -1
}
i += 1
}
)
tupleIdx
}
def createLookupTable(rel: KapRel): DataFrame = {
val start = System.currentTimeMillis()
val olapContext = rel.getContext
val instance = olapContext.realization match {
case dataflow: NDataflow => dataflow
case _ => olapContext.realization.asInstanceOf[HybridRealization]
}
val tableMetadataManager = NTableMetadataManager.getInstance(instance.getConfig, instance.getProject)
val lookupTableName = olapContext.firstTableScan.getTableName
val snapshotResPath = tableMetadataManager.getTableDesc(lookupTableName).getLastSnapshotPath
val config = instance.getConfig
val dataFrameTableName = instance.getProject + "@" + lookupTableName
val lookupDf = SparderLookupManager.getOrCreate(dataFrameTableName, snapshotResPath, config)
val olapTable = olapContext.firstTableScan.getOlapTable
val alisTableName = olapContext.firstTableScan.getBackupAlias
val newNames = lookupDf.schema.fieldNames.map { name =>
val gTInfoSchema = SchemaProcessor.parseDeriveTableSchemaName(name)
SchemaProcessor.generateDeriveTableSchemaName(alisTableName,
gTInfoSchema.columnId,
gTInfoSchema.columnName)
}.array
val newNameLookupDf = lookupDf.toDF(newNames: _*)
val colIndex = olapTable.getSourceColumns.asScala
.map(
column =>
if (column.isComputedColumn || column.getZeroBasedIndex < 0) {
RuntimeHelper.literalOne.as(column.toString)
} else {
col(
SchemaProcessor
.generateDeriveTableSchemaName(
alisTableName,
column.getZeroBasedIndex,
column.getName
)
)
})
val df = newNameLookupDf.select(colIndex: _*)
logInfo(s"Gen lookup table scan cost Time :${System.currentTimeMillis() - start} ")
df
}
private def expandDerived(layoutCandidate: NLayoutCandidate,
cols: util.Collection[TblColRef]): util.Set[TblColRef] = {
val expanded = new util.HashSet[TblColRef]
val model = layoutCandidate.getLayoutEntity.getModel
val tblIdMap = model.getEffectiveCols.inverse()
cols.asScala.foreach(
col => {
val hostInfo = layoutCandidate.getDerivedToHostMap.get(tblIdMap.get(col))
if (hostInfo != null) {
for (hostCol <- hostInfo.columns.asScala) {
expanded.add(model.getColRef(hostCol))
}
} else {
expanded.add(col)
}
}
)
expanded
}
def createSingleRow(): DataFrame = {
val session = SparderEnv.getSparkSession
val rows = List.fill(1)(Row.fromSeq(List[Object]()))
val rdd = session.sparkContext.makeRDD(rows)
session.createDataFrame(rdd, StructType(List[StructField]()))
}
}