| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.ignite |
| |
| import java.net.URI |
| |
| import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA |
| import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE |
| import org.apache.ignite.spark.IgniteContext |
| import org.apache.ignite.spark.IgniteDataFrameSettings._ |
| import org.apache.ignite.spark.impl.IgniteSQLRelation.schema |
| import org.apache.ignite.{Ignite, IgniteException} |
| import org.apache.spark.sql.{AnalysisException, SparkSession} |
| import org.apache.spark.sql.catalyst.TableIdentifier |
| import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec |
| import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, _} |
| import org.apache.spark.sql.catalyst.expressions.Expression |
| import org.apache.spark.sql.catalyst.util.StringUtils |
| import org.apache.spark.sql.types.StructType |
| import org.apache.ignite.spark.impl._ |
| import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE |
| import org.apache.spark.sql.ignite.IgniteExternalCatalog.{IGNITE_PROTOCOL, IGNITE_URI, OPTION_GRID} |
| |
| import scala.collection.JavaConversions._ |
| |
| /** |
| * External catalog implementation to provide transparent access to SQL tables existed in Ignite. |
| * |
| * @param igniteContext Ignite context to provide access to Ignite instance. |
| */ |
| private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext) |
| extends ExternalCatalog { |
| /** |
| * Default Ignite instance. |
| */ |
| @transient private val ignite: Ignite = igniteContext.ignite() |
| |
| @transient private var currentSchema = DEFAULT_DATABASE |
| |
| /** |
| * @param db Ignite instance name. |
| * @return Description of Ignite instance. |
| */ |
| override def getDatabase(db: String): CatalogDatabase = |
| CatalogDatabase(db, db, IGNITE_URI, Map.empty) |
| |
| /** |
| * Checks Ignite schema with provided name exists. |
| * |
| * @param schema Ignite schema name or <code>SessionCatalog.DEFAULT_DATABASE</code>. |
| * @return True is Ignite schema exists. |
| */ |
| override def databaseExists(schema: String): Boolean = |
| schema == DEFAULT_DATABASE || allSchemas(ignite).exists(schema.equalsIgnoreCase) |
| |
| /** |
| * @return List of all known Ignite schemas. |
| */ |
| override def listDatabases(): Seq[String] = |
| allSchemas(ignite) |
| |
| /** |
| * @param pattern Pattern to filter databases names. |
| * @return List of all known Ignite schema names filtered by pattern. |
| */ |
| override def listDatabases(pattern: String): Seq[String] = |
| StringUtils.filterPattern(listDatabases(), pattern) |
| |
| /** |
| * Sets default Ignite schema. |
| * |
| * @param schema Name of Ignite schema. |
| */ |
| override def setCurrentDatabase(schema: String): Unit = |
| currentSchema = schema |
| |
| /** @inheritdoc */ |
| override def getTable(db: String, table: String): CatalogTable = getTableOption(db, table).get |
| |
| def getTableOption(db: String, tabName: String): Option[CatalogTable] = { |
| val gridName = igniteName(ignite) |
| |
| val schemaName = schemaOrDefault(db, currentSchema) |
| |
| sqlTableInfo(ignite, tabName, Some(db)) match { |
| case Some(table) ⇒ |
| val tableName = table.tableName |
| |
| Some(new CatalogTable( |
| identifier = new TableIdentifier(tableName, Some(schemaName)), |
| tableType = CatalogTableType.EXTERNAL, |
| storage = CatalogStorageFormat( |
| locationUri = Some(URI.create(IGNITE_PROTOCOL + schemaName + "/" + tableName)), |
| inputFormat = Some(FORMAT_IGNITE), |
| outputFormat = Some(FORMAT_IGNITE), |
| serde = None, |
| compressed = false, |
| properties = Map( |
| OPTION_GRID → gridName, |
| OPTION_TABLE → tableName) |
| ), |
| schema = schema(table), |
| provider = Some(FORMAT_IGNITE), |
| partitionColumnNames = |
| if (!allKeyFields(table).isEmpty) |
| allKeyFields(table).toSeq |
| else |
| Seq(table.keyFieldName), |
| bucketSpec = None)) |
| case None ⇒ None |
| } |
| } |
| |
| /** @inheritdoc */ |
| override def tableExists(db: String, table: String): Boolean = |
| sqlTableExists(ignite, table, Some(schemaOrDefault(db, currentSchema))) |
| |
| /** @inheritdoc */ |
| override def listTables(db: String): Seq[String] = listTables(db, ".*") |
| |
| /** @inheritdoc */ |
| override def listTables(db: String, pattern: String): Seq[String] = |
| StringUtils.filterPattern( |
| cachesForSchema[Any,Any](ignite, Some(schemaOrDefault(db, currentSchema))) |
| .flatMap(_.getQueryEntities.map(_.getTableName)), pattern) |
| |
| /** @inheritdoc */ |
| override def loadTable(db: String, table: String, |
| loadPath: String, isOverwrite: Boolean, isSrcLocal: Boolean): Unit = { /* no-op */ } |
| |
| /** @inheritdoc */ |
| override def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition = null |
| |
| /** @inheritdoc */ |
| override def getPartitionOption(db: String, table: String, |
| spec: TablePartitionSpec): Option[CatalogTablePartition] = None |
| |
| /** @inheritdoc */ |
| override def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[String] = { |
| sqlCacheName(ignite, table, Some(schemaOrDefault(db, currentSchema))).map { cacheName ⇒ |
| val parts = ignite.affinity(cacheName).partitions() |
| |
| (0 until parts).map(_.toString) |
| }.getOrElse(Seq.empty) |
| } |
| |
| /** @inheritdoc */ |
| override def listPartitions(db: String, table: String, |
| partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = { |
| |
| val partitionNames = listPartitionNames(db, table, partialSpec) |
| |
| if (partitionNames.isEmpty) |
| Seq.empty |
| else { |
| val cacheName = sqlCacheName(ignite, table, Some(schemaOrDefault(db, currentSchema))).get |
| |
| val aff = ignite.affinity[Any](cacheName) |
| |
| partitionNames.map { name ⇒ |
| val nodes = aff.mapPartitionToPrimaryAndBackups(name.toInt) |
| |
| if (nodes.isEmpty) |
| throw new AnalysisException(s"Nodes for parition is empty [grid=${ignite.name},table=$table,partition=$name].") |
| |
| CatalogTablePartition ( |
| Map[String, String] ( |
| "name" → name, |
| "primary" → nodes.head.id.toString, |
| "backups" → nodes.tail.map(_.id.toString).mkString(",") |
| ), |
| CatalogStorageFormat.empty |
| ) |
| } |
| } |
| } |
| |
| /** @inheritdoc */ |
| override def listPartitionsByFilter(db: String, |
| table: String, |
| predicates: Seq[Expression], |
| defaultTimeZoneId: String): Seq[CatalogTablePartition] = |
| listPartitions(db, table, None) |
| |
| /** @inheritdoc */ |
| override def loadPartition(db: String, |
| table: String, |
| loadPath: String, |
| partition: TablePartitionSpec, isOverwrite: Boolean, |
| inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit = { /* no-op */ } |
| |
| /** @inheritdoc */ |
| override def loadDynamicPartitions(db: String, table: String, |
| loadPath: String, |
| partition: TablePartitionSpec, replace: Boolean, |
| numDP: Int): Unit = { /* no-op */ } |
| |
| /** @inheritdoc */ |
| override def getFunction(db: String, funcName: String): CatalogFunction = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def functionExists(db: String, funcName: String): Boolean = false |
| |
| /** @inheritdoc */ |
| override def listFunctions(db: String, pattern: String): Seq[String] = Seq.empty[String] |
| |
| /** @inheritdoc */ |
| override def alterDatabase(dbDefinition: CatalogDatabase): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def alterTable(tableDefinition: CatalogTable): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def alterTableDataSchema(db: String, table: String, schema: StructType): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def createFunction(db: String, funcDefinition: CatalogFunction): Unit = { /* no-op */ } |
| |
| /** @inheritdoc */ |
| override def dropFunction(db: String, funcName: String): Unit = { /* no-op */ } |
| |
| /** @inheritdoc */ |
| override def renameFunction(db: String, oldName: String, newName: String): Unit = { /* no-op */ } |
| |
| /** @inheritdoc */ |
| override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { |
| sqlTableInfo(ignite, tableDefinition.identifier.table, tableDefinition.identifier.database) match { |
| case Some(_) ⇒ |
| /* no-op */ |
| |
| case None ⇒ |
| val schema = tableDefinition.identifier.database |
| |
| if(schema.isDefined && !schema.contains(DFLT_SCHEMA) && !schema.contains(DEFAULT_DATABASE)) |
| throw new IgniteException("Can only create new tables in PUBLIC schema, not " + schema.get) |
| |
| val props = tableDefinition.storage.properties |
| |
| QueryHelper.createTable(tableDefinition.schema, |
| tableDefinition.identifier.table, |
| props(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(","), |
| props.get(OPTION_CREATE_TABLE_PARAMETERS), |
| ignite) |
| } |
| } |
| |
| /** @inheritdoc */ |
| override def dropTable(db: String, tabName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit = |
| sqlTableInfo(ignite, tabName, Some(schemaOrDefault(db, currentSchema))) match { |
| case Some(table) ⇒ |
| val tableName = table.tableName |
| |
| QueryHelper.dropTable(tableName, ignite) |
| |
| case None ⇒ |
| if (!ignoreIfNotExists) |
| throw new IgniteException(s"Table $tabName doesn't exists.") |
| } |
| |
| /** @inheritdoc */ |
| override def renameTable(db: String, oldName: String, newName: String): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def createPartitions(db: String, table: String, |
| parts: Seq[CatalogTablePartition], |
| ignoreIfExists: Boolean): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def dropPartitions(db: String, table: String, |
| parts: Seq[TablePartitionSpec], |
| ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def renamePartitions(db: String, table: String, |
| specs: Seq[TablePartitionSpec], |
| newSpecs: Seq[TablePartitionSpec]): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| |
| /** @inheritdoc */ |
| override def alterPartitions(db: String, table: String, |
| parts: Seq[CatalogTablePartition]): Unit = |
| throw new UnsupportedOperationException("unsupported") |
| } |
| |
| object IgniteExternalCatalog { |
| /** |
| * Config option to specify named grid instance to connect when loading data. |
| * For internal use only. |
| * |
| * @see [[org.apache.ignite.Ignite#name()]] |
| */ |
| private[apache] val OPTION_GRID = "grid" |
| |
| /** |
| * Location of ignite tables. |
| */ |
| private[apache] val IGNITE_PROTOCOL = "ignite:/" |
| |
| /** |
| * URI location of ignite tables. |
| */ |
| private val IGNITE_URI = new URI(IGNITE_PROTOCOL) |
| } |