blob: 507feb2b595aa3a900b1acf24b6071e13dd49f26 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import java.util
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.SchemaUtils
abstract class FileTable(
sparkSession: SparkSession,
options: CaseInsensitiveStringMap,
paths: Seq[String],
userSpecifiedSchema: Option[StructType])
extends Table with SupportsRead with SupportsWrite {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
lazy val fileIndex: PartitioningAwareFileIndex = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
// We are reading from the results of a streaming query. We will load files from
// the metadata log instead of listing them using HDFS APIs.
new MetadataLogFileIndex(sparkSession, new Path(paths.head),
options.asScala.toMap, userSpecifiedSchema)
} else {
// This is a non-streaming file based datasource.
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = true, enableGlobbing = globPaths)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
}
}
lazy val dataSchema: StructType = {
val schema = userSpecifiedSchema.map { schema =>
val partitionSchema = fileIndex.partitionSchema
val resolver = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name))))
}.orElse {
inferSchema(fileIndex.allFiles())
}.getOrElse {
throw QueryCompilationErrors.dataSchemaNotSpecifiedError(formatName)
}
fileIndex match {
case _: MetadataLogFileIndex => schema
case _ => schema.asNullable
}
}
override lazy val schema: StructType = {
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
SchemaUtils.checkSchemaColumnNameDuplication(dataSchema,
"in the data schema", caseSensitive)
dataSchema.foreach { field =>
if (!supportsDataType(field.dataType)) {
throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(formatName, field)
}
}
val partitionSchema = fileIndex.partitionSchema
SchemaUtils.checkSchemaColumnNameDuplication(partitionSchema,
"in the partition schema", caseSensitive)
val partitionNameSet: Set[String] =
partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet
// When data and partition schemas have overlapping columns,
// tableSchema = dataSchema - overlapSchema + partitionSchema
val fields = dataSchema.fields.filterNot { field =>
val colName = PartitioningUtils.getColName(field, caseSensitive)
partitionNameSet.contains(colName)
} ++ partitionSchema.fields
StructType(fields)
}
override def partitioning: Array[Transform] = fileIndex.partitionSchema.names.toSeq.asTransforms
override def properties: util.Map[String, String] = options.asCaseSensitiveMap
override def capabilities: java.util.Set[TableCapability] = FileTable.CAPABILITIES
/**
* When possible, this method should return the schema of the given `files`. When the format
* does not support inference, or no valid files are given should return None. In these cases
* Spark will require that user specify the schema manually.
*/
def inferSchema(files: Seq[FileStatus]): Option[StructType]
/**
* Returns whether this format supports the given [[DataType]] in read/write path.
* By default all data types are supported.
*/
def supportsDataType(dataType: DataType): Boolean = true
/**
* The string that represents the format that this data source provider uses. This is
* overridden by children to provide a nice alias for the data source. For example:
*
* {{{
* override def formatName(): String = "ORC"
* }}}
*/
def formatName: String
/**
* Returns a V1 [[FileFormat]] class of the same file data source.
* This is a solution for the following cases:
* 1. File datasource V2 implementations cause regression. Users can disable the problematic data
* source via SQL configuration and fall back to FileFormat.
* 2. Catalog support is required, which is still under development for data source V2.
*/
def fallbackFileFormat: Class[_ <: FileFormat]
/**
* Whether or not paths should be globbed before being used to access files.
*/
private def globPaths: Boolean = {
val entry = options.get(DataSource.GLOB_PATHS_KEY)
Option(entry).map(_ == "true").getOrElse(true)
}
}
object FileTable {
private val CAPABILITIES = Set(BATCH_READ, BATCH_WRITE).asJava
}