blob: 625f4498a102257ee2ae63408a7d863c0b7251eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spark.impl
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.internal.IgniteEx
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl
import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.spark.IgniteDataFrameSettings._
import org.apache.ignite.spark.impl.QueryUtils.{compileCreateTable, compileDropTable, compileInsert}
import org.apache.ignite.{Ignite, IgniteException}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row}
/**
* Helper class for executing DDL queries.
*/
private[apache] object QueryHelper {
/**
* Drops provided table.
*
* @param tableName Table name.
* @param ignite Ignite.
*/
def dropTable(tableName: String, ignite: Ignite): Unit = {
val qryProcessor = ignite.asInstanceOf[IgniteEx].context().query()
val qry = compileDropTable(tableName)
qryProcessor.querySqlFields(new SqlFieldsQuery(qry), true).getAll
}
/**
* Creates table.
*
* @param schema Schema.
* @param tblName Table name.
* @param primaryKeyFields Primary key fields.
* @param createTblOpts Ignite specific options.
* @param ignite Ignite.
*/
def createTable(schema: StructType, tblName: String, primaryKeyFields: Seq[String], createTblOpts: Option[String],
ignite: Ignite): Unit = {
val qryProcessor = ignite.asInstanceOf[IgniteEx].context().query()
val qry = compileCreateTable(schema, tblName, primaryKeyFields, createTblOpts)
qryProcessor.querySqlFields(new SqlFieldsQuery(qry), true).getAll
}
/**
* Ensures all options are specified correctly to create table based on provided `schema`.
*
* @param schema Schema of new table.
* @param params Parameters.
*/
def ensureCreateTableOptions(schema: StructType, params: Map[String, String], ctx: IgniteContext): Unit = {
if (!params.contains(OPTION_TABLE) && !params.contains("path"))
throw new IgniteException("'table' must be specified.")
if (params.contains(OPTION_SCHEMA) && !params(OPTION_SCHEMA).equalsIgnoreCase(DFLT_SCHEMA)) {
throw new IgniteException("Creating new tables in schema " + params(OPTION_SCHEMA) + " is not valid, tables"
+ " must only be created in " + DFLT_SCHEMA)
}
params.get(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS)
.map(_.split(','))
.getOrElse(throw new IgniteException("Can't create table! Primary key fields has to be specified."))
.map(_.trim)
.foreach { pkField ⇒
if (pkField == "")
throw new IgniteException("PK field can't be empty.")
if (!schema.exists(_.name.equalsIgnoreCase(pkField)))
throw new IgniteException(s"'$pkField' doesn't exists in DataFrame schema.")
}
}
/**
* Saves data to the table.
*
* @param data Data.
* @param tblName Table name.
* @param schemaName Optional schema name.
* @param ctx Ignite context.
* @param streamerAllowOverwrite Flag enabling overwriting existing values in cache.
* @param streamerFlushFrequency Insert query streamer automatic flush frequency.
* @param streamerPerNodeBufferSize Insert query streamer size of per node query buffer.
* @param streamerPerNodeParallelOperations Insert query streamer maximum number of parallel operations for a single node.
*
* @see [[org.apache.ignite.IgniteDataStreamer]]
* @see [[org.apache.ignite.IgniteDataStreamer#allowOverwrite(boolean)]]
* @see [[org.apache.ignite.IgniteDataStreamer#autoFlushFrequency(long)]]
* @see [[org.apache.ignite.IgniteDataStreamer#perNodeBufferSize(int)]]
* @see [[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]]
*/
def saveTable(data: DataFrame,
tblName: String,
schemaName: Option[String],
ctx: IgniteContext,
streamerAllowOverwrite: Option[Boolean],
streamerSkipStore: Option[Boolean],
streamerFlushFrequency: Option[Long],
streamerPerNodeBufferSize: Option[Int],
streamerPerNodeParallelOperations: Option[Int]
): Unit = {
val insertQry = compileInsert(tblName, data.schema)
data.rdd.foreachPartition(iterator =>
savePartition(iterator,
insertQry,
tblName,
schemaName,
ctx,
streamerAllowOverwrite,
streamerSkipStore,
streamerFlushFrequency,
streamerPerNodeBufferSize,
streamerPerNodeParallelOperations
))
}
/**
* Saves partition data to the Ignite table.
*
* @param iterator Data iterator.
* @param insertQry Insert query.
* @param tblName Table name.
* @param schemaName Optional schema name.
* @param ctx Ignite context.
* @param streamerAllowOverwrite Flag enabling overwriting existing values in cache.
* @param streamerFlushFrequency Insert query streamer automatic flush frequency.
* @param streamerPerNodeBufferSize Insert query streamer size of per node query buffer.
* @param streamerPerNodeParallelOperations Insert query streamer maximum number of parallel operations for a single node.
*
* @see [[org.apache.ignite.IgniteDataStreamer]]
* @see [[org.apache.ignite.IgniteDataStreamer#allowOverwrite(boolean)]]
* @see [[org.apache.ignite.IgniteDataStreamer#autoFlushFrequency(long)]]
* @see [[org.apache.ignite.IgniteDataStreamer#perNodeBufferSize(int)]]
* @see [[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]]
*/
private def savePartition(iterator: Iterator[Row],
insertQry: String,
tblName: String,
schemaName: Option[String],
ctx: IgniteContext,
streamerAllowOverwrite: Option[Boolean],
streamerSkipStore: Option[Boolean],
streamerFlushFrequency: Option[Long],
streamerPerNodeBufferSize: Option[Int],
streamerPerNodeParallelOperations: Option[Int]
): Unit = {
val tblInfo = sqlTableInfo(ctx.ignite(), tblName, schemaName).get.asInstanceOf[QueryTypeDescriptorImpl]
val streamer = ctx.ignite().dataStreamer(tblInfo.cacheName)
streamerAllowOverwrite.foreach(v ⇒ streamer.allowOverwrite(v))
streamerSkipStore.foreach(v ⇒ streamer.skipStore(v))
streamerFlushFrequency.foreach(v ⇒ streamer.autoFlushFrequency(v))
streamerPerNodeBufferSize.foreach(v ⇒ streamer.perNodeBufferSize(v))
streamerPerNodeParallelOperations.foreach(v ⇒ streamer.perNodeParallelOperations(v))
try {
val qryProcessor = ctx.ignite().asInstanceOf[IgniteEx].context().query()
iterator.foreach { row ⇒
val schema = row.schema
val args = schema.map { f ⇒
row.get(row.fieldIndex(f.name)).asInstanceOf[Object]
}
qryProcessor.streamUpdateQuery(tblInfo.cacheName,
tblInfo.schemaName, streamer, insertQry, args.toArray, "spark")
}
}
finally {
streamer.close()
}
}
}