blob: a23853916f9d469f1460dd3bbb63a928fe56bfa5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.examples.spark;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.spark.IgniteDataFrameSettings;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Example application demonstrates the join operations between two dataframes or Spark tables with data saved in Ignite caches.
*/
public class JavaIgniteDataFrameJoinExample {
/** Ignite config file. */
private static final String CONFIG = "examples/config/example-ignite.xml";
/** Test cache name. */
private static final String CACHE_NAME = "testCache";
/** @param args Command line arguments. */
public static void main(String args[]) {
setupServerAndData();
// Creating spark session.
SparkSession spark = SparkSession
.builder()
.appName("JavaIgniteDataFrameJoinExample")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate();
// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger().setLevel(Level.ERROR);
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);
// Executing examples.
sparkDSLJoinExample(spark);
nativeSparkSqlJoinExample(spark);
Ignition.stop(false);
}
/** */
private static void sparkDSLJoinExample(SparkSession spark) {
System.out.println("Querying using Spark DSL.");
Dataset<Row> persons = spark.read()
.format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_TABLE(), "person")
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG)
.load();
persons.printSchema();
persons.show();
Dataset<Row> cities = spark.read()
.format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_TABLE(), "city")
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG)
.load();
cities.printSchema();
cities.show();
Dataset<Row> joinResult = persons.join(cities, persons.col("city_id").equalTo(cities.col("id")))
.select(persons.col("name").as("person"), persons.col("age"), cities.col("name").as("city"), cities.col("country"));
joinResult.explain(true);
joinResult.printSchema();
joinResult.show();
}
/** */
private static void nativeSparkSqlJoinExample(SparkSession spark) {
System.out.println("Querying using Spark SQL.");
Dataset<Row> persons = spark.read()
.format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_TABLE(), "person")
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG)
.load();
persons.printSchema();
persons.show();
Dataset<Row> cities = spark.read()
.format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_TABLE(), "city")
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG)
.load();
cities.printSchema();
cities.show();
// Registering DataFrame as Spark view.
persons.createOrReplaceTempView("person");
cities.createOrReplaceTempView("city");
// Selecting data from Ignite through Spark SQL Engine.
Dataset<Row> joinResult = spark.sql(
"SELECT person.name AS person, age, city.name AS city, country FROM person JOIN city ON person.city_id = city.id"
);
joinResult.explain(true);
joinResult.printSchema();
joinResult.show();
}
/** */
private static void setupServerAndData() {
// Starting Ignite.
Ignite ignite = Ignition.start(CONFIG);
// Creating first test cache.
CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(CACHE_NAME).setSqlSchema("PUBLIC");
IgniteCache<?, ?> cache = ignite.getOrCreateCache(ccfg);
// Creating SQL tables.
cache.query(new SqlFieldsQuery(
"CREATE TABLE city (id LONG PRIMARY KEY, name VARCHAR, country VARCHAR) WITH \"template=replicated\"")).getAll();
cache.query(new SqlFieldsQuery(
"CREATE TABLE person (id LONG, name VARCHAR, age INT, city_id LONG, PRIMARY KEY (id, city_id)) " +
"WITH \"backups=1, affinity_key=city_id\"")).getAll();
cache.query(new SqlFieldsQuery("CREATE INDEX on Person (city_id)")).getAll();
SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO city (id, name, country) VALUES (?, ?, ?)");
// Inserting some data to tables.
cache.query(qry.setArgs(1L, "Forest Hill", "USA")).getAll();
cache.query(qry.setArgs(2L, "Denver", "USA")).getAll();
cache.query(qry.setArgs(3L, "St. Petersburg", "Russia")).getAll();
qry = new SqlFieldsQuery("INSERT INTO person (id, name, age, city_id) values (?, ?, ?, ?)");
cache.query(qry.setArgs(1L, "Alexey Zinoviev", 31, 3L)).getAll();
cache.query(qry.setArgs(2L, "Jane Roe", 27, 2L)).getAll();
cache.query(qry.setArgs(3L, "Mary Major", 86, 1L)).getAll();
cache.query(qry.setArgs(4L, "Richard Miles", 19, 2L)).getAll();
}
}