blob: ec502fc8544554e1108cd51df9f3eb734bf12602 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spark.impl
import org.apache.ignite.cache.CacheMode
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Partition, TaskContext}
import java.util.{ListJList}
/**
* Implementation of Spark RDD for Apache Ignite to support Data Frame API.
*/
class IgniteSQLDataFrameRDD[K, V](
ic: IgniteContext,
cacheName: String,
schema: StructType,
qryStr: String,
args: List[_],
parts: Array[Partition],
distributedJoin: Boolean) extends
IgniteSqlRDD[Row, JList[_], K, V](
ic,
cacheName,
cacheCfg = null,
qry = null,
r ⇒ new GenericRowWithSchema(r.toArray.map(IgniteRDD.convertIfNeeded), schema),
keepBinary = true,
parts) {
/**
* Executes an Ignite query for this RDD and return Iterator to iterate throw results.
*
* @param partition Partition.
* @param context TaskContext.
* @return Results of query for specific partition.
*/
override def compute(partition: Partition, context: TaskContext): Iterator[Row] = {
val qry0 = new SqlFieldsQuery(qryStr)
qry0.setDistributedJoins(distributedJoin)
if (args.nonEmpty)
qry0.setArgs(args.map(_.asInstanceOf[Object]): _*)
val ccfg = ic.ignite().cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
val ignitePartition = partition.asInstanceOf[IgniteDataFramePartition]
if (ccfg.getCacheMode != CacheMode.REPLICATED && ignitePartition.igniteParts.nonEmpty && !distributedJoin)
qry0.setPartitions(ignitePartition.igniteParts: _*)
qry = qry0
super.compute(partition, context)
}
}
object IgniteSQLDataFrameRDD {
def apply[K, V](ic: IgniteContext,
cacheName: String,
schema: StructType,
qryStr: String,
args: List[_],
parts: Array[Partition] = Array(IgnitePartition(0)),
distributedJoin: Boolean = false): IgniteSQLDataFrameRDD[K, V] = {
new IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryStr, args, parts, distributedJoin)
}
}