blob: 7799d395d5c6a3532afadd66e42a9e1ac06ee40f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import scala.collection.Map
import scala.language.implicitConversions
import scala.reflect.classTag
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
/**
* A collection of implicit methods for converting names and Symbols into [[Column]]s, and for
* converting common Scala objects into [[Dataset]]s.
*
* @since 3.4.0
*/
abstract class SQLImplicits private[sql] (session: SparkSession) extends LowPrioritySQLImplicits {
/**
* Converts $"col name" into a [[Column]].
*
* @since 3.4.0
*/
implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
new ColumnName(sc.s(args: _*))
}
}
/**
* An implicit conversion that turns a Scala `Symbol` into a [[Column]].
* @since 3.4.0
*/
implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)
/** @since 3.4.0 */
implicit val newIntEncoder: Encoder[Int] = PrimitiveIntEncoder
/** @since 3.4.0 */
implicit val newLongEncoder: Encoder[Long] = PrimitiveLongEncoder
/** @since 3.4.0 */
implicit val newDoubleEncoder: Encoder[Double] = PrimitiveDoubleEncoder
/** @since 3.4.0 */
implicit val newFloatEncoder: Encoder[Float] = PrimitiveFloatEncoder
/** @since 3.4.0 */
implicit val newByteEncoder: Encoder[Byte] = PrimitiveByteEncoder
/** @since 3.4.0 */
implicit val newShortEncoder: Encoder[Short] = PrimitiveShortEncoder
/** @since 3.4.0 */
implicit val newBooleanEncoder: Encoder[Boolean] = PrimitiveBooleanEncoder
/** @since 3.4.0 */
implicit val newStringEncoder: Encoder[String] = StringEncoder
/** @since 3.4.0 */
implicit val newJavaDecimalEncoder: Encoder[java.math.BigDecimal] =
AgnosticEncoders.DEFAULT_JAVA_DECIMAL_ENCODER
/** @since 3.4.0 */
implicit val newScalaDecimalEncoder: Encoder[scala.math.BigDecimal] =
AgnosticEncoders.DEFAULT_SCALA_DECIMAL_ENCODER
/** @since 3.4.0 */
implicit val newDateEncoder: Encoder[java.sql.Date] = AgnosticEncoders.STRICT_DATE_ENCODER
/** @since 3.4.0 */
implicit val newLocalDateEncoder: Encoder[java.time.LocalDate] =
AgnosticEncoders.STRICT_LOCAL_DATE_ENCODER
/** @since 3.4.0 */
implicit val newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] =
AgnosticEncoders.LocalDateTimeEncoder
/** @since 3.4.0 */
implicit val newTimeStampEncoder: Encoder[java.sql.Timestamp] =
AgnosticEncoders.STRICT_TIMESTAMP_ENCODER
/** @since 3.4.0 */
implicit val newInstantEncoder: Encoder[java.time.Instant] =
AgnosticEncoders.STRICT_INSTANT_ENCODER
/** @since 3.4.0 */
implicit val newDurationEncoder: Encoder[java.time.Duration] = DayTimeIntervalEncoder
/** @since 3.4.0 */
implicit val newPeriodEncoder: Encoder[java.time.Period] = YearMonthIntervalEncoder
/** @since 3.4.0 */
implicit def newJavaEnumEncoder[A <: java.lang.Enum[_]: TypeTag]: Encoder[A] = {
ScalaReflection.encoderFor[A]
}
// Boxed primitives
/** @since 3.4.0 */
implicit val newBoxedIntEncoder: Encoder[java.lang.Integer] = BoxedIntEncoder
/** @since 3.4.0 */
implicit val newBoxedLongEncoder: Encoder[java.lang.Long] = BoxedLongEncoder
/** @since 3.4.0 */
implicit val newBoxedDoubleEncoder: Encoder[java.lang.Double] = BoxedDoubleEncoder
/** @since 3.4.0 */
implicit val newBoxedFloatEncoder: Encoder[java.lang.Float] = BoxedFloatEncoder
/** @since 3.4.0 */
implicit val newBoxedByteEncoder: Encoder[java.lang.Byte] = BoxedByteEncoder
/** @since 3.4.0 */
implicit val newBoxedShortEncoder: Encoder[java.lang.Short] = BoxedShortEncoder
/** @since 3.4.0 */
implicit val newBoxedBooleanEncoder: Encoder[java.lang.Boolean] = BoxedBooleanEncoder
// Seqs
private def newSeqEncoder[E](elementEncoder: AgnosticEncoder[E]): AgnosticEncoder[Seq[E]] = {
IterableEncoder(
classTag[Seq[E]],
elementEncoder,
elementEncoder.nullable,
elementEncoder.lenientSerialization)
}
/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newIntSeqEncoder: Encoder[Seq[Int]] = newSeqEncoder(PrimitiveIntEncoder)
/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newLongSeqEncoder: Encoder[Seq[Long]] = newSeqEncoder(PrimitiveLongEncoder)
/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newDoubleSeqEncoder: Encoder[Seq[Double]] = newSeqEncoder(PrimitiveDoubleEncoder)
/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newFloatSeqEncoder: Encoder[Seq[Float]] = newSeqEncoder(PrimitiveFloatEncoder)
/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newByteSeqEncoder: Encoder[Seq[Byte]] = newSeqEncoder(PrimitiveByteEncoder)
/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newShortSeqEncoder: Encoder[Seq[Short]] = newSeqEncoder(PrimitiveShortEncoder)
/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newBooleanSeqEncoder: Encoder[Seq[Boolean]] = newSeqEncoder(PrimitiveBooleanEncoder)
/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newStringSeqEncoder: Encoder[Seq[String]] = newSeqEncoder(StringEncoder)
/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
def newProductSeqEncoder[A <: Product: TypeTag]: Encoder[Seq[A]] =
newSeqEncoder(ScalaReflection.encoderFor[A])
/** @since 3.4.0 */
implicit def newSequenceEncoder[T <: Seq[_]: TypeTag]: Encoder[T] =
ScalaReflection.encoderFor[T]
// Maps
/** @since 3.4.0 */
implicit def newMapEncoder[T <: Map[_, _]: TypeTag]: Encoder[T] = ScalaReflection.encoderFor[T]
/**
* Notice that we serialize `Set` to Catalyst array. The set property is only kept when
* manipulating the domain objects. The serialization format doesn't keep the set property. When
* we have a Catalyst array which contains duplicated elements and convert it to
* `Dataset[Set[T]]` by using the encoder, the elements will be de-duplicated.
*
* @since 3.4.0
*/
implicit def newSetEncoder[T <: Set[_]: TypeTag]: Encoder[T] = ScalaReflection.encoderFor[T]
// Arrays
private def newArrayEncoder[E](
elementEncoder: AgnosticEncoder[E]): AgnosticEncoder[Array[E]] = {
ArrayEncoder(elementEncoder, elementEncoder.nullable)
}
/** @since 3.4.0 */
implicit val newIntArrayEncoder: Encoder[Array[Int]] = newArrayEncoder(PrimitiveIntEncoder)
/** @since 3.4.0 */
implicit val newLongArrayEncoder: Encoder[Array[Long]] = newArrayEncoder(PrimitiveLongEncoder)
/** @since 3.4.0 */
implicit val newDoubleArrayEncoder: Encoder[Array[Double]] =
newArrayEncoder(PrimitiveDoubleEncoder)
/** @since 3.4.0 */
implicit val newFloatArrayEncoder: Encoder[Array[Float]] = newArrayEncoder(
PrimitiveFloatEncoder)
/** @since 3.4.0 */
implicit val newByteArrayEncoder: Encoder[Array[Byte]] = BinaryEncoder
/** @since 3.4.0 */
implicit val newShortArrayEncoder: Encoder[Array[Short]] = newArrayEncoder(
PrimitiveShortEncoder)
/** @since 3.4.0 */
implicit val newBooleanArrayEncoder: Encoder[Array[Boolean]] =
newArrayEncoder(PrimitiveBooleanEncoder)
/** @since 3.4.0 */
implicit val newStringArrayEncoder: Encoder[Array[String]] = newArrayEncoder(StringEncoder)
/** @since 3.4.0 */
implicit def newProductArrayEncoder[A <: Product: TypeTag]: Encoder[Array[A]] = {
newArrayEncoder(ScalaReflection.encoderFor[A])
}
/**
* Creates a [[Dataset]] from a local Seq.
* @since 3.4.0
*/
implicit def localSeqToDatasetHolder[T: Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(session.createDataset(s))
}
}
/**
* Lower priority implicit methods for converting Scala objects into [[Dataset]]s. Conflicting
* implicits are placed here to disambiguate resolution.
*
* Reasons for including specific implicits: newProductEncoder - to disambiguate for `List`s which
* are both `Seq` and `Product`
*/
trait LowPrioritySQLImplicits {
/** @since 3.4.0 */
implicit def newProductEncoder[T <: Product: TypeTag]: Encoder[T] =
ScalaReflection.encoderFor[T]
}