blob: 92f454c1bcd1e8fd5a6fc934cf8f1bd4b4750cbb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning}
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.connector.SupportsMetadata
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
trait DataSourceV2ScanExecBase extends LeafExecNode {
lazy val customMetrics = scan.supportedCustomMetrics().map { customMetric =>
customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric)
}.toMap
override lazy val metrics = {
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) ++
customMetrics
}
def scan: Scan
def partitions: Seq[InputPartition]
def readerFactory: PartitionReaderFactory
override def simpleString(maxFields: Int): String = {
val result =
s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}"
redact(result)
}
/**
* Shorthand for calling redact() without specifying redacting rules
*/
protected def redact(text: String): String = {
Utils.redact(session.sessionState.conf.stringRedactionPattern, text)
}
override def verboseStringWithOperatorId(): String = {
val metaDataStr = scan match {
case s: SupportsMetadata =>
s.getMetaData().toSeq.sorted.flatMap {
case (_, value) if value.isEmpty || value.equals("[]") => None
case (key, value) => Some(s"$key: ${redact(value)}")
case _ => None
}
case _ =>
Seq(scan.description())
}
s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Output", output)}
|${metaDataStr.mkString("\n")}
|""".stripMargin
}
override def outputPartitioning: physical.Partitioning = scan match {
case _ if partitions.length == 1 =>
SinglePartition
case s: SupportsReportPartitioning =>
new DataSourcePartitioning(
s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
case _ => super.outputPartitioning
}
override def supportsColumnar: Boolean = {
require(partitions.forall(readerFactory.supportColumnarReads) ||
!partitions.exists(readerFactory.supportColumnarReads),
"Cannot mix row-based and columnar input partitions.")
partitions.exists(readerFactory.supportColumnarReads)
}
def inputRDD: RDD[InternalRow]
def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.map { r =>
numOutputRows += 1
r
}
}
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b =>
numOutputRows += b.numRows()
b
}
}
}