blob: 6beb599916a6df26e825a528cd29d3efa1d24c3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.ignite
import java.net.URI
import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA
import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.spark.IgniteDataFrameSettings._
import org.apache.ignite.spark.impl.IgniteSQLRelation.schema
import org.apache.ignite.{Ignite, IgniteException}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, _}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.StructType
import org.apache.ignite.spark.impl._
import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE
import org.apache.spark.sql.ignite.IgniteExternalCatalog.{IGNITE_PROTOCOL, IGNITE_URI, OPTION_GRID}
import scala.collection.JavaConversions._
/**
* External catalog implementation to provide transparent access to SQL tables existed in Ignite.
*
* @param igniteContext Ignite context to provide access to Ignite instance.
*/
private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
extends ExternalCatalog {
/**
* Default Ignite instance.
*/
@transient private val ignite: Ignite = igniteContext.ignite()
@transient private var currentSchema = DEFAULT_DATABASE
/**
* @param db Ignite instance name.
* @return Description of Ignite instance.
*/
override def getDatabase(db: String): CatalogDatabase =
CatalogDatabase(db, db, IGNITE_URI, Map.empty)
/**
* Checks Ignite schema with provided name exists.
*
* @param schema Ignite schema name or <code>SessionCatalog.DEFAULT_DATABASE</code>.
* @return True is Ignite schema exists.
*/
override def databaseExists(schema: String): Boolean =
schema == DEFAULT_DATABASE || allSchemas(ignite).exists(schema.equalsIgnoreCase)
/**
* @return List of all known Ignite schemas.
*/
override def listDatabases(): Seq[String] =
allSchemas(ignite)
/**
* @param pattern Pattern to filter databases names.
* @return List of all known Ignite schema names filtered by pattern.
*/
override def listDatabases(pattern: String): Seq[String] =
StringUtils.filterPattern(listDatabases(), pattern)
/**
* Sets default Ignite schema.
*
* @param schema Name of Ignite schema.
*/
override def setCurrentDatabase(schema: String): Unit =
currentSchema = schema
/** @inheritdoc */
override def getTable(db: String, table: String): CatalogTable = getTableOption(db, table).get
def getTableOption(db: String, tabName: String): Option[CatalogTable] = {
val gridName = igniteName(ignite)
val schemaName = schemaOrDefault(db, currentSchema)
sqlTableInfo(ignite, tabName, Some(db)) match {
case Some(table)
val tableName = table.tableName
Some(new CatalogTable(
identifier = new TableIdentifier(tableName, Some(schemaName)),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
locationUri = Some(URI.create(IGNITE_PROTOCOL + schemaName + "/" + tableName)),
inputFormat = Some(FORMAT_IGNITE),
outputFormat = Some(FORMAT_IGNITE),
serde = None,
compressed = false,
properties = Map(
OPTION_GRID → gridName,
OPTION_TABLE → tableName)
),
schema = schema(table),
provider = Some(FORMAT_IGNITE),
partitionColumnNames =
if (!allKeyFields(table).isEmpty)
allKeyFields(table).toSeq
else
Seq(table.keyFieldName),
bucketSpec = None))
case NoneNone
}
}
/** @inheritdoc */
override def tableExists(db: String, table: String): Boolean =
sqlTableExists(ignite, table, Some(schemaOrDefault(db, currentSchema)))
/** @inheritdoc */
override def listTables(db: String): Seq[String] = listTables(db, ".*")
/** @inheritdoc */
override def listTables(db: String, pattern: String): Seq[String] =
StringUtils.filterPattern(
cachesForSchema[Any,Any](ignite, Some(schemaOrDefault(db, currentSchema)))
.flatMap(_.getQueryEntities.map(_.getTableName)), pattern)
/** @inheritdoc */
override def loadTable(db: String, table: String,
loadPath: String, isOverwrite: Boolean, isSrcLocal: Boolean): Unit = { /* no-op */ }
/** @inheritdoc */
override def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition = null
/** @inheritdoc */
override def getPartitionOption(db: String, table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = None
/** @inheritdoc */
override def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[String] = {
sqlCacheName(ignite, table, Some(schemaOrDefault(db, currentSchema))).map { cacheName ⇒
val parts = ignite.affinity(cacheName).partitions()
(0 until parts).map(_.toString)
}.getOrElse(Seq.empty)
}
/** @inheritdoc */
override def listPartitions(db: String, table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
val partitionNames = listPartitionNames(db, table, partialSpec)
if (partitionNames.isEmpty)
Seq.empty
else {
val cacheName = sqlCacheName(ignite, table, Some(schemaOrDefault(db, currentSchema))).get
val aff = ignite.affinity[Any](cacheName)
partitionNames.map { name ⇒
val nodes = aff.mapPartitionToPrimaryAndBackups(name.toInt)
if (nodes.isEmpty)
throw new AnalysisException(s"Nodes for parition is empty [grid=${ignite.name},table=$table,partition=$name].")
CatalogTablePartition (
Map[String, String] (
"name" → name,
"primary" → nodes.head.id.toString,
"backups" → nodes.tail.map(_.id.toString).mkString(",")
),
CatalogStorageFormat.empty
)
}
}
}
/** @inheritdoc */
override def listPartitionsByFilter(db: String,
table: String,
predicates: Seq[Expression],
defaultTimeZoneId: String): Seq[CatalogTablePartition] =
listPartitions(db, table, None)
/** @inheritdoc */
override def loadPartition(db: String,
table: String,
loadPath: String,
partition: TablePartitionSpec, isOverwrite: Boolean,
inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit = { /* no-op */ }
/** @inheritdoc */
override def loadDynamicPartitions(db: String, table: String,
loadPath: String,
partition: TablePartitionSpec, replace: Boolean,
numDP: Int): Unit = { /* no-op */ }
/** @inheritdoc */
override def getFunction(db: String, funcName: String): CatalogFunction =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def functionExists(db: String, funcName: String): Boolean = false
/** @inheritdoc */
override def listFunctions(db: String, pattern: String): Seq[String] = Seq.empty[String]
/** @inheritdoc */
override def alterDatabase(dbDefinition: CatalogDatabase): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def alterTable(tableDefinition: CatalogTable): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def alterTableDataSchema(db: String, table: String, schema: StructType): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def createFunction(db: String, funcDefinition: CatalogFunction): Unit = { /* no-op */ }
/** @inheritdoc */
override def dropFunction(db: String, funcName: String): Unit = { /* no-op */ }
/** @inheritdoc */
override def renameFunction(db: String, oldName: String, newName: String): Unit = { /* no-op */ }
/** @inheritdoc */
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
sqlTableInfo(ignite, tableDefinition.identifier.table, tableDefinition.identifier.database) match {
case Some(_)
/* no-op */
case None
val schema = tableDefinition.identifier.database
if(schema.isDefined && !schema.contains(DFLT_SCHEMA) && !schema.contains(DEFAULT_DATABASE))
throw new IgniteException("Can only create new tables in PUBLIC schema, not " + schema.get)
val props = tableDefinition.storage.properties
QueryHelper.createTable(tableDefinition.schema,
tableDefinition.identifier.table,
props(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(","),
props.get(OPTION_CREATE_TABLE_PARAMETERS),
ignite)
}
}
/** @inheritdoc */
override def dropTable(db: String, tabName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit =
sqlTableInfo(ignite, tabName, Some(schemaOrDefault(db, currentSchema))) match {
case Some(table)
val tableName = table.tableName
QueryHelper.dropTable(tableName, ignite)
case None
if (!ignoreIfNotExists)
throw new IgniteException(s"Table $tabName doesn't exists.")
}
/** @inheritdoc */
override def renameTable(db: String, oldName: String, newName: String): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def createPartitions(db: String, table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def dropPartitions(db: String, table: String,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def renamePartitions(db: String, table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit =
throw new UnsupportedOperationException("unsupported")
/** @inheritdoc */
override def alterPartitions(db: String, table: String,
parts: Seq[CatalogTablePartition]): Unit =
throw new UnsupportedOperationException("unsupported")
}
object IgniteExternalCatalog {
/**
* Config option to specify named grid instance to connect when loading data.
* For internal use only.
*
* @see [[org.apache.ignite.Ignite#name()]]
*/
private[apache] val OPTION_GRID = "grid"
/**
* Location of ignite tables.
*/
private[apache] val IGNITE_PROTOCOL = "ignite:/"
/**
* URI location of ignite tables.
*/
private val IGNITE_URI = new URI(IGNITE_PROTOCOL)
}