blob: 32aa4ea5e7572718e0613b24a077048f5b576787 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.examples.spark
import java.lang.{Integer => JInt, Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.spark.IgniteDataFrameSettings._
import org.apache.ignite.{Ignite, Ignition}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
* Example application demonstrates the join operations between two dataframes or Spark tables with data saved in Ignite caches.
*/
object IgniteDataFrameJoinExample extends App {
/** Ignite config file. */
private val CONFIG = "config/example-ignite.xml"
/** Test cache name. */
private val CACHE_NAME = "testCache"
// Starting Ignite server node.
val ignite = setupServerAndData
closeAfter(ignite) { ignite ⇒
//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("IgniteDataFrameJoinExample")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()
// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
// Executing examples.
sparkDSLJoinExample
nativeSparkSqlJoinExample
}
/**
* Examples of usage Ignite DataFrame implementation.
* Selecting data throw Spark DSL.
*
* @param spark SparkSession.
*/
def sparkDSLJoinExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println
val persons = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_TABLE, "person")
.option(OPTION_CONFIG_FILE, CONFIG)
.load()
persons.printSchema()
persons.show()
val cities = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_TABLE, "city")
.option(OPTION_CONFIG_FILE, CONFIG)
.load()
persons.printSchema()
persons.show()
val joinResult = persons.join(cities, persons("city_id") === cities("id"))
.select(persons("name").as("person"), persons("age"), cities("name").as("city"), cities("country"))
joinResult.explain(true)
joinResult.printSchema()
joinResult.show()
}
/**
* Examples of usage Ignite DataFrame implementation.
* Registration of Ignite DataFrame for following usage.
* Selecting data by Spark SQL query.
*
* @param spark SparkSession.
*/
def nativeSparkSqlJoinExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark SQL.")
println
val persons = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_TABLE, "person")
.option(OPTION_CONFIG_FILE, CONFIG)
.load()
persons.printSchema()
persons.show()
val cities = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_TABLE, "city")
.option(OPTION_CONFIG_FILE, CONFIG)
.load()
persons.printSchema()
persons.show()
// Registering DataFrame as Spark view.
persons.createOrReplaceTempView("person")
cities.createOrReplaceTempView("city")
// Selecting data from Ignite throw Spark SQL Engine.
val joinResult = spark.sql("""
| SELECT
| person.name AS person,
| age,
| city.name AS city,
| country
| FROM
| person JOIN
| city ON person.city_id = city.id
""".stripMargin);
joinResult.explain(true)
joinResult.printSchema()
joinResult.show()
}
def setupServerAndData: Ignite = {
// Starting Ignite.
val ignite = Ignition.start(CONFIG)
// Creating first test cache.
val ccfg = new CacheConfiguration[JLong, JString](CACHE_NAME).setSqlSchema("PUBLIC")
val cache = ignite.getOrCreateCache(ccfg)
// Creating SQL tables.
cache.query(new SqlFieldsQuery(
"CREATE TABLE city (id LONG PRIMARY KEY, name VARCHAR, country VARCHAR) WITH \"template=replicated\"")).getAll
cache.query(new SqlFieldsQuery(
"CREATE TABLE person (id LONG, name VARCHAR, age INT, city_id LONG, PRIMARY KEY (id, city_id)) " +
"WITH \"backups=1, affinityKey=city_id\"")).getAll
cache.query(new SqlFieldsQuery("CREATE INDEX on Person (city_id)")).getAll
// Inserting some data to tables.
var qry = new SqlFieldsQuery("INSERT INTO city (id, name, country) VALUES (?, ?, ?)")
cache.query(qry.setArgs(1L.asInstanceOf[JLong], "Forest Hill", "USA")).getAll
cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Denver", "USA")).getAll
cache.query(qry.setArgs(3L.asInstanceOf[JLong], "St. Petersburg", "Russia")).getAll
qry = new SqlFieldsQuery("INSERT INTO person (id, name, age, city_id) values (?, ?, ?, ?)")
cache.query(qry.setArgs(1L.asInstanceOf[JLong], "John Doe", 31.asInstanceOf[JInt], 3L.asInstanceOf[JLong])).getAll
cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Jane Roe", 27.asInstanceOf[JInt], 2L.asInstanceOf[JLong])).getAll
cache.query(qry.setArgs(3L.asInstanceOf[JLong], "Mary Major", 86.asInstanceOf[JInt], 1L.asInstanceOf[JLong])).getAll
cache.query(qry.setArgs(4L.asInstanceOf[JLong], "Richard Miles", 19.asInstanceOf[JInt], 2L.asInstanceOf[JLong])).getAll
ignite
}
}