blob: a214970ba475b8cb3c03429b190de6cde3673cf2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= IgniteContext and IgniteRDD
== IgniteContext
IgniteContext is the main entry point to Spark-Ignite integration. To create an instance of Ignite context, user must provide an instance of SparkContext and a closure creating `IgniteConfiguration` (configuration factory). Ignite context will make sure that server or client Ignite nodes exist in all involved job instances. Alternatively, a path to an XML configuration file can be passed to `IgniteContext` constructor which will be used to configure nodes being started.
When creating an `IgniteContext` instance, an optional boolean `client` argument (defaulting to `true`) can be passed to context constructor. This is typically used in a Shared Deployment installation. When `client` is set to `false`, context will operate in embedded mode and will start server nodes on all workers during the context construction. This is required in an Embedded Deployment installation. See link:ignite-for-spark/installation[Installation] for information on deployment configurations.
[CAUTION]
====
[discrete]
=== Embedded Mode Deprecation
Embedded mode implies starting Ignite server nodes within Spark executors which can cause unexpected rebalancing or even data loss. Therefore this mode is currently deprecated and will be eventually discontinued. Consider starting a separate Ignite cluster and using standalone mode to avoid data consistency and performance issues.
====
Once `IgniteContext` is created, instances of `IgniteRDD` may be obtained using `fromCache` methods. It is not required that requested cache exist in Ignite cluster when RDD is created. If the cache with the given name does not exist, it will be created using provided configuration or template configuration.
For example, the following code will create an Ignite context with default Ignite configuration
[source, scala]
----
val igniteContext = new IgniteContext(sparkContext,
() => new IgniteConfiguration())
----
The following code will create an Ignite context configured from a file `example-shared-rdd.xml`:
[source, scala]
----
val igniteContext = new IgniteContext(sparkContext,
"examples/config/spark/example-shared-rdd.xml")
----
== IgniteRDD
`IgniteRDD` is an implementation of Spark RDD abstraction representing a live view of Ignite cache. `IgniteRDD` is not immutable, all changes in Ignite cache (regardless whether they were caused by another RDD or external changes in cache) will be visible to RDD users immediately.
`IgniteRDD` utilizes partitioned nature of Ignite caches and provides partitioning information to Spark executor. Number of partitions in `IgniteRDD` equals to the number of partitions in underlying Ignite cache. `IgniteRDD` also provides affinity information to Spark via `getPrefferredLocations` method so that RDD computations use data locality.
== Reading values from Ignite
Since `IgniteRDD` is a live view of Ignite cache, there is no need to explicitly load data to Spark application from Ignite. All RDD methods are available to use right away after an instance of `IgniteRDD` is created.
For example, assuming an Ignite cache with name "partitioned" contains string values, the following code will find all values that contain the word "Ignite":
[source, scala]
----
val cache = igniteContext.fromCache("partitioned")
val result = cache.filter(_._2.contains("Ignite")).collect()
----
== Saving values to Ignite
Since Ignite caches operate on key-value pairs, the most straightforward way to save values to Ignite cache is to use a Spark tuple RDD and `savePairs` method. This method will take advantage of the RDD partitioning and store value to cache in a parallel manner, if possible.
It is also possible to save value-only RDD into Ignite cache using `saveValues` method. In this case `IgniteRDD` will generate a unique affinity-local key for each value being stored into the cache.
For example, the following code will store pairs of integers from 1 to 10000 into cache named "partitioned" using 10 parallel store operations:
[source, scala]
----
val cacheRdd = igniteContext.fromCache("partitioned")
cacheRdd.savePairs(sparkContext.parallelize(1 to 10000, 10).map(i => (i, i)))
----
== Running SQL queries against Ignite cache
When Ignite cache is configured with the indexing subsystem enabled, it is possible to run SQL queries against the cache using `objectSql` and `sql` methods. See link:SQL/sql-introduction[Working with SQL] for more information about Ignite SQL queries.
For example, assuming the "partitioned" cache is configured to index pairs of integers, the following code will get all integers in the range (10, 100):
[source, scala]
----
val cacheRdd = igniteContext.fromCache("partitioned")
val result = cacheRdd.sql("select _val from Integer where val > ? and val < ?", 10, 100)
----
== Example
There are a couple of examples available on GitHub that demonstrate the usage of `IgniteRDD`:
* link:{githubUrl}/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala[Scala Example^]
* link:{githubUrl}/examples/src/main/spark/org/apache/ignite/examples/spark/SharedRDDExample.java[Java Example^]