blob: 18662e846000d01ae04e13da8e0019f00a895db4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.scalar.examples.spark
import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
/**
* This example demonstrates how to create an IgnitedRDD and share it with multiple spark workers.
* The goal of this particular example is to provide the simplest code example of this logic.
* <p>
* This example will start Ignite in the embedded mode and will start an IgniteContext on each Spark worker node.
* <p>
* The example can work in the standalone mode as well that can be enabled by setting IgniteContext's {@code isClient}
* property to {@code true} and running an Ignite node separately with `examples/config/spark/
* example-shared-rdd.xml` config.
* <p>
*/
object ScalarSharedRDDExample extends App {
// Spark Configuration.
private val conf = new SparkConf()
.setAppName("IgniteRDDExample")
.setMaster("local")
.set("spark.executor.instances", "2")
// Spark context.
val sparkContext = new SparkContext(conf)
// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
// Defines spring cache Configuration path.
private val CONFIG = "examples/config/spark/example-shared-rdd.xml"
// Creates Ignite context with above configuration.
val igniteContext = new IgniteContext(sparkContext, CONFIG, false)
// Creates an Ignite Shared RDD of Type (Int,Int) Integer Pair.
val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
// Fill the Ignite Shared RDD in with Int pairs.
sharedRDD.savePairs(sparkContext.parallelize(1 to 100000, 10).map(i => (i, i)))
// Transforming Pairs to contain their Squared value.
sharedRDD.mapValues(x => (x * x))
// Retrieve sharedRDD back from the Cache.
val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache("sharedRDD")
// Perform some transformations on IgniteRDD and print.
val squareAndRootPair = transformedValues.map { case (x, y) => (x, Math.sqrt(y.toDouble)) }
println(">>> Transforming values stored in Ignite Shared RDD...")
// Filter out pairs which square roots are less than 100 and
// take the first five elements from the transformed IgniteRDD and print them.
squareAndRootPair.filter(_._2 < 100.0).take(5).foreach(println)
println(">>> Executing SQL query over Ignite Shared RDD...")
// Execute a SQL query over the Ignite Shared RDD.
val df = transformedValues.sql("select _val from Integer where _val < 100 and _val > 9 ")
// Show ten rows from the result set.
df.show(10)
// Close IgniteContext on all workers.
igniteContext.close(true)
// Stop SparkContext.
sparkContext.stop()
}