| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.datasources.v2 |
| |
| import org.apache.spark.rdd.RDD |
| import org.apache.spark.sql.catalyst.InternalRow |
| import org.apache.spark.sql.catalyst.expressions.AttributeMap |
| import org.apache.spark.sql.catalyst.plans.physical |
| import org.apache.spark.sql.catalyst.plans.physical.SinglePartition |
| import org.apache.spark.sql.catalyst.util.truncatedString |
| import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning} |
| import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode} |
| import org.apache.spark.sql.execution.metric.SQLMetrics |
| import org.apache.spark.sql.internal.connector.SupportsMetadata |
| import org.apache.spark.sql.vectorized.ColumnarBatch |
| import org.apache.spark.util.Utils |
| |
| trait DataSourceV2ScanExecBase extends LeafExecNode { |
| |
| lazy val customMetrics = scan.supportedCustomMetrics().map { customMetric => |
| customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) |
| }.toMap |
| |
| override lazy val metrics = { |
| Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) ++ |
| customMetrics |
| } |
| |
| def scan: Scan |
| |
| def partitions: Seq[InputPartition] |
| |
| def readerFactory: PartitionReaderFactory |
| |
| override def simpleString(maxFields: Int): String = { |
| val result = |
| s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}" |
| redact(result) |
| } |
| |
| /** |
| * Shorthand for calling redact() without specifying redacting rules |
| */ |
| protected def redact(text: String): String = { |
| Utils.redact(session.sessionState.conf.stringRedactionPattern, text) |
| } |
| |
| override def verboseStringWithOperatorId(): String = { |
| val metaDataStr = scan match { |
| case s: SupportsMetadata => |
| s.getMetaData().toSeq.sorted.flatMap { |
| case (_, value) if value.isEmpty || value.equals("[]") => None |
| case (key, value) => Some(s"$key: ${redact(value)}") |
| case _ => None |
| } |
| case _ => |
| Seq(scan.description()) |
| } |
| s""" |
| |$formattedNodeName |
| |${ExplainUtils.generateFieldString("Output", output)} |
| |${metaDataStr.mkString("\n")} |
| |""".stripMargin |
| } |
| |
| override def outputPartitioning: physical.Partitioning = scan match { |
| case _ if partitions.length == 1 => |
| SinglePartition |
| |
| case s: SupportsReportPartitioning => |
| new DataSourcePartitioning( |
| s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) |
| |
| case _ => super.outputPartitioning |
| } |
| |
| override def supportsColumnar: Boolean = { |
| require(partitions.forall(readerFactory.supportColumnarReads) || |
| !partitions.exists(readerFactory.supportColumnarReads), |
| "Cannot mix row-based and columnar input partitions.") |
| |
| partitions.exists(readerFactory.supportColumnarReads) |
| } |
| |
| def inputRDD: RDD[InternalRow] |
| |
| def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) |
| |
| override def doExecute(): RDD[InternalRow] = { |
| val numOutputRows = longMetric("numOutputRows") |
| inputRDD.map { r => |
| numOutputRows += 1 |
| r |
| } |
| } |
| |
| override def doExecuteColumnar(): RDD[ColumnarBatch] = { |
| val numOutputRows = longMetric("numOutputRows") |
| inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b => |
| numOutputRows += b.numRows() |
| b |
| } |
| } |
| } |