* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.spark.sql.ignite
import org.apache.ignite.IgniteException
import org.apache.ignite.configuration.IgniteConfiguration
import org.apache.ignite.internal.IgnitionEx
import org.apache.ignite.spark.IgniteContext
import scala.collection.JavaConverters._
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
* Implementation of Spark Session for Ignite.
class IgniteSparkSession private(
ic: IgniteContext,
proxy: SparkSession,
existingSharedState: Option[SharedState],
parentSessionState: Option[SessionState]) extends SparkSession(proxy.sparkContext) {
self ⇒
private def this(ic: IgniteContext, proxy: SparkSession) =
this(ic, proxy, None, None)
private def this(proxy: SparkSession) =
this(new IgniteContext(proxy.sparkContext, IgnitionEx.DFLT_CFG), proxy)
private def this(proxy: SparkSession, configPath: String) =
this(new IgniteContext(proxy.sparkContext, configPath), proxy)
private def this(proxy: SparkSession, cfgF: () => IgniteConfiguration) =
this(new IgniteContext(proxy.sparkContext, cfgF), proxy)
/** @inheritdoc */
@transient override lazy val catalog = new CatalogImpl(self)
/** @inheritdoc */
@transient override val sqlContext: SQLContext = new SQLContext(self)
/** @inheritdoc */
@transient override lazy val sharedState: SharedState =
existingSharedState.getOrElse(new IgniteSharedState(ic, sparkContext))
/** @inheritdoc */
@transient override lazy val sessionState: SessionState = {
.getOrElse {
val sessionState = new SessionStateBuilder(self, None).build()
sessionState.experimentalMethods.extraOptimizations =
sessionState.experimentalMethods.extraOptimizations :+ IgniteOptimization
/** @inheritdoc */
@transient override lazy val conf: RuntimeConfig = proxy.conf
/** @inheritdoc */
@transient override lazy val emptyDataFrame: DataFrame = proxy.emptyDataFrame
/** @inheritdoc */
override def newSession(): SparkSession = new IgniteSparkSession(ic, proxy.newSession())
/** @inheritdoc */
override def version: String = proxy.version
/** @inheritdoc */
override def emptyDataset[T: Encoder]: Dataset[T] = {
val encoder = implicitly[Encoder[T]]
new Dataset(self, LocalRelation(encoder.schema.toAttributes), encoder)
/** @inheritdoc */
override def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = {
Dataset.ofRows(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
/** @inheritdoc */
override def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
val attributeSeq: Seq[AttributeReference] = getSchema(beanClass)
val className = beanClass.getName
val rowRdd = rdd.mapPartitions { iter =>
SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq)
Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self))
/** @inheritdoc */
override def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = {
val attrSeq = getSchema(beanClass)
val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass, attrSeq)
Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq))
/** @inheritdoc */
override def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
val encoder = Encoders.product[A]
Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder))
/** @inheritdoc */
override def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
/** @inheritdoc */
override def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = {
val enc = encoderFor[T]
val attributes = enc.schema.toAttributes
val encoded = => enc.toRow(d).copy())
val plan = new LocalRelation(attributes, encoded)
Dataset[T](self, plan)
/** @inheritdoc */
override def createDataset[T: Encoder](data: RDD[T]): Dataset[T] = {
Dataset[T](self, ExternalRDD(data, self))
/** @inheritdoc */
override def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = {
new Dataset(self, Range(start, end, step, numPartitions), Encoders.LONG)
/** @inheritdoc */
override def table(tableName: String): DataFrame = {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
/** @inheritdoc */
override def sql(sqlText: String): DataFrame = Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
/** @inheritdoc */
override def read: DataFrameReader = new DataFrameReader(self)
/** @inheritdoc */
override def readStream: DataStreamReader = new DataStreamReader(self)
/** @inheritdoc */
override def stop(): Unit = proxy.stop()
/** @inheritdoc */
override private[sql] def applySchemaToPythonRDD(rdd: RDD[Array[Any]], schema: StructType) = {
val rowRdd = => python.EvaluatePython.makeFromJava(schema).asInstanceOf[InternalRow])
Dataset.ofRows(self, LogicalRDD(schema.toAttributes, rowRdd)(self))
/** @inheritdoc */
override private[sql] def cloneSession(): IgniteSparkSession = {
val session = new IgniteSparkSession(ic, proxy.cloneSession(), Some(sharedState), Some(sessionState))
session.sessionState // Force copy of SessionState.
/** @inheritdoc */
@transient override private[sql] val extensions =
/** @inheritdoc */
override private[sql] def createDataFrame(rowRDD: RDD[Row],
schema: StructType,
needsConversion: Boolean) = {
val catalystRows = if (needsConversion) {
val encoder = RowEncoder(schema)
} else {{r: Row => InternalRow.fromSeq(r.toSeq)}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
Dataset.ofRows(self, logicalPlan)
/** @inheritdoc */
override private[sql] def table( tableIdent: TableIdentifier) =
Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
private def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
val (dataType, _) = JavaTypeInference.inferDataType(beanClass)
dataType.asInstanceOf[StructType] { f =>
AttributeReference(, f.dataType, f.nullable)()
object IgniteSparkSession {
* @return New instance of <code>IgniteBuilder</code>
def builder(): IgniteBuilder = {
new IgniteBuilder
* Builder for <code>IgniteSparkSession</code>.
* Extends spark session builder with methods related to Ignite configuration.
class IgniteBuilder extends Builder {
* Config provider.
private var cfgF: ()IgniteConfiguration = _
* Path to config file.
private var config: String = _
/** @inheritdoc */
override def getOrCreate(): IgniteSparkSession = synchronized {
val sparkSession = super.getOrCreate()
val ic = if (cfgF != null)
new IgniteContext(sparkSession.sparkContext, cfgF)
else if (config != null)
new IgniteContext(sparkSession.sparkContext, config)
else {
logWarning("No `igniteConfig` or `igniteConfigProvider`. " +
"IgniteSparkSession will use DFLT_CFG for Ignite.")
new IgniteContext(sparkSession.sparkContext)
new IgniteSparkSession(ic, sparkSession)
* Set path to Ignite config file.
* User should use only one of <code>igniteConfig</code> and <code>igniteConfigProvider</code>.
* @param cfg Path to Ignite config file.
* @return This for chaining.
def igniteConfig(cfg: String): IgniteBuilder = {
if (cfgF != null)
throw new IgniteException("only one of config or configProvider should be provided")
this.config = cfg
* Set Ignite config provider.
* User should use only one of <code>igniteConfig</code> and <code>igniteConfigProvider</code>.
* @param cfgF Closure to provide <code>IgniteConfiguration</code>.
* @return This for chaining.
def igniteConfigProvider(cfgF: ()IgniteConfiguration): IgniteBuilder = {
if (config != null)
throw new IgniteException("only one of config or configProvider should be provided")
this.cfgF = cfgF
/** @inheritdoc */
override def appName(name: String): IgniteBuilder = {
/** @inheritdoc */
override def config(key: String, value: String): IgniteBuilder = {
super.config(key, value)
/** @inheritdoc */
override def config(key: String, value: Long): IgniteBuilder = {
super.config(key, value)
/** @inheritdoc */
override def config(key: String, value: Double): IgniteBuilder = {
super.config(key, value)
/** @inheritdoc */
override def config(key: String, value: Boolean): IgniteBuilder = {
super.config(key, value)
/** @inheritdoc */
override def config(conf: SparkConf): IgniteBuilder = {
/** @inheritdoc */
override def master(master: String): IgniteBuilder = {
* This method will throw RuntimeException as long as we building '''IgniteSparkSession'''
override def enableHiveSupport(): IgniteBuilder =
throw new IgniteException("This method doesn't supported by IgniteSparkSession")
/** @inheritdoc */
override def withExtensions(f: (SparkSessionExtensions)Unit): IgniteBuilder = {