blob: b88cfa5095e45a0c9d370ededeaf0c1ef9cc3b3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.scalar.examples
import java.lang.{Integer => JavaInt}
import java.util.ConcurrentModificationException
import java.util.concurrent.ThreadLocalRandom
import javax.cache.Cache
import org.apache.ignite.IgniteCache
import org.apache.ignite.cache.CacheMode
import org.apache.ignite.scalar.scalar
import org.apache.ignite.scalar.scalar._
import scala.collection.JavaConversions._
/**
* <a href="http://en.wikipedia.org/wiki/Snowflake_schema">Snowflake Schema</a> is a logical
* arrangement of data in which data is split into `dimensions` and `facts`
* <i>Dimensions</i> can be referenced or joined by other <i>dimensions</i> or <i>facts</i>,
* however, <i>facts</i> are generally not referenced by other facts. You can view <i>dimensions</i>
* as your master or reference data, while <i>facts</i> are usually large data sets of events or
* other objects that continuously come into the system and may change frequently. In Ignite
* such architecture is supported via cross-cache queries. By storing <i>dimensions</i> in
* `CacheMode#REPLICATED REPLICATED` caches and <i>facts</i> in much larger
* `CacheMode#PARTITIONED PARTITIONED` caches you can freely execute distributed joins across
* your whole in-memory data ignite cluster, thus querying your in memory data without any limitations.
* <p/>
* In this example we have two <i>dimensions</i>, `DimProduct` and `DimStore` and
* one <i>fact</i> - `FactPurchase`. Queries are executed by joining dimensions and facts
* in various ways.
* <p/>
* Remote nodes should be started using `ExampleNodeStartup` which will
* start node with `examples/config/example-ignite.xml` configuration.
*/
object ScalarSnowflakeSchemaExample {
/** Configuration file name. */
private val CONFIG = "examples/config/example-ignite.xml"
/** Name of partitioned cache specified in spring configuration. */
private val PARTITIONED_CACHE_NAME = "ScalarSnowflakeSchemaExamplePartitioned"
/** Name of replicated cache specified in spring configuration. */
private val REPLICATED_CACHE_NAME = "ScalarSnowflakeSchemaExampleReplicated"
/** ID generator. */
private[this] val idGen = Stream.from(0).iterator
/** DimStore data. */
private[this] val dataStore = scala.collection.mutable.Map[JavaInt, DimStore]()
/** DimProduct data. */
private[this] val dataProduct = scala.collection.mutable.Map[JavaInt, DimProduct]()
/**
* Example entry point. No arguments required.
*/
def main(args: Array[String]) {
scalar(CONFIG) {
println
println(">>> Cache star schema example started.")
// Destroy caches to clean up the data if any left from previous runs.
destroyCache$(PARTITIONED_CACHE_NAME)
destroyCache$(REPLICATED_CACHE_NAME)
val dimCache = createCache$[JavaInt, AnyRef](REPLICATED_CACHE_NAME, CacheMode.REPLICATED, Seq(classOf[JavaInt], classOf[DimStore],
classOf[JavaInt], classOf[DimProduct]))
try {
val factCache = createCache$[JavaInt, FactPurchase](PARTITIONED_CACHE_NAME, indexedTypes = Seq(classOf[JavaInt], classOf[FactPurchase]))
try {
populateDimensions(dimCache)
populateFacts(factCache)
queryStorePurchases()
queryProductPurchases()
}
finally {
factCache.destroy()
}
}
finally {
dimCache.destroy()
}
}
}
/**
* Populate cache with `dimensions` which in our case are
* `DimStore` and `DimProduct` instances.
*/
def populateDimensions(dimCache: IgniteCache[JavaInt, AnyRef]) {
val store1 = new DimStore(idGen.next(), "Store1", "12345", "321 Chilly Dr, NY")
val store2 = new DimStore(idGen.next(), "Store2", "54321", "123 Windy Dr, San Francisco")
// Populate stores.
dimCache.put(store1.id, store1)
dimCache.put(store2.id, store2)
dataStore.put(store1.id, store1)
dataStore.put(store2.id, store2)
for (i <- 1 to 20) {
val product = new DimProduct(idGen.next(), "Product" + i, i + 1, (i + 1) * 10)
dimCache.put(product.id, product)
dataProduct.put(product.id, product)
}
}
/**
* Populate cache with `facts`, which in our case are `FactPurchase` objects.
*/
def populateFacts(factCache: IgniteCache[JavaInt, FactPurchase]) {
for (i <- 1 to 100) {
val store: DimStore = rand(dataStore.values)
val prod: DimProduct = rand(dataProduct.values)
val purchase: FactPurchase = new FactPurchase(idGen.next(), prod.id, store.id, i + 1)
factCache.put(purchase.id, purchase)
}
}
/**
* Query all purchases made at a specific store. This query uses cross-cache joins
* between `DimStore` objects stored in `replicated` cache and
* `FactPurchase` objects stored in `partitioned` cache.
*/
def queryStorePurchases() {
val factCache = ignite$.cache[JavaInt, FactPurchase](PARTITIONED_CACHE_NAME)
val storePurchases = factCache.sql(
"from \"" + REPLICATED_CACHE_NAME + "\".DimStore, \"" + PARTITIONED_CACHE_NAME + "\".FactPurchase " +
"where DimStore.id=FactPurchase.storeId and DimStore.name=?", "Store1")
printQueryResults("All purchases made at store1:", storePurchases.getAll)
}
/**
* Query all purchases made at a specific store for 3 specific products.
* This query uses cross-cache joins between `DimStore`, `DimProduct`
* objects stored in `replicated` cache and `FactPurchase` objects
* stored in `partitioned` cache.
*/
private def queryProductPurchases() {
val factCache = ignite$.cache[JavaInt, FactPurchase](PARTITIONED_CACHE_NAME)
// All purchases for certain product made at store2.
// =================================================
val p1: DimProduct = rand(dataProduct.values)
val p2: DimProduct = rand(dataProduct.values)
val p3: DimProduct = rand(dataProduct.values)
println("IDs of products [p1=" + p1.id + ", p2=" + p2.id + ", p3=" + p3.id + ']')
val prodPurchases = factCache.sql(
"from \"" + REPLICATED_CACHE_NAME + "\".DimStore, \"" + REPLICATED_CACHE_NAME + "\".DimProduct, \"" +
PARTITIONED_CACHE_NAME + "\".FactPurchase " +
"where DimStore.id=FactPurchase.storeId and " +
"DimProduct.id=FactPurchase.productId and " +
"DimStore.name=? and DimProduct.id in(?, ?, ?)",
"Store2", p1.id, p2.id, p3.id)
printQueryResults("All purchases made at store2 for 3 specific products:", prodPurchases.getAll)
}
/**
* Print query results.
*
* @param msg Initial message.
* @param res Results to print.
*/
private def printQueryResults[V](msg: String, res: Iterable[Cache.Entry[JavaInt, V]]) {
println(msg)
for (e <- res)
println(" " + e.getValue.toString)
}
/**
* Gets random value from given collection.
*
* @param c Input collection (no `null` and not emtpy).
* @return Random value from the input collection.
*/
def rand[T](c: Iterable[_ <: T]): T = {
val n: Int = ThreadLocalRandom.current.nextInt(c.size)
var i: Int = 0
for (t <- c) {
if (i < n)
i += 1
else
return t
}
throw new ConcurrentModificationException
}
}
/**
* Represents a physical store location. In our `snowflake` schema a `store`
* is a `dimension` and will be cached in `CacheMode#REPLICATED` cache.
*
* @param id Primary key.
* @param name Store name.
* @param zip Zip code.
* @param addr Address.
*/
class DimStore(
@ScalarCacheQuerySqlField
val id: Int,
@ScalarCacheQuerySqlField
val name: String,
val zip: String,
val addr: String) {
/**
* `toString` implementation.
*/
override def toString: String = {
val sb: StringBuilder = new StringBuilder
sb.append("DimStore ")
sb.append("[id=").append(id)
sb.append(", name=").append(name)
sb.append(", zip=").append(zip)
sb.append(", addr=").append(addr)
sb.append(']')
sb.toString()
}
}
/**
* Represents a product available for purchase. In our `snowflake` schema a `product`
* is a `dimension` and will be cached in `CacheMode#REPLICATED` cache.
*
* @param id Product ID.
* @param name Product name.
* @param price Product list price.
* @param qty Available product quantity.
*/
class DimProduct(
@ScalarCacheQuerySqlField
val id: Int,
val name: String,
@ScalarCacheQuerySqlField
val price: Float,
val qty: Int) {
/**
* `toString` implementation.
*/
override def toString: String = {
val sb: StringBuilder = new StringBuilder
sb.append("DimProduct ")
sb.append("[id=").append(id)
sb.append(", name=").append(name)
sb.append(", price=").append(price)
sb.append(", qty=").append(qty)
sb.append(']')
sb.toString()
}
}
/**
* Represents a purchase record. In our `snowflake` schema purchase
* is a `fact` and will be cached in larger `CacheMode#PARTITIONED` cache.
*
* @param id Purchase ID.
* @param productId Purchased product ID.
* @param storeId Store ID.
* @param purchasePrice Purchase price.
*/
class FactPurchase(
@ScalarCacheQuerySqlField
val id: Int,
@ScalarCacheQuerySqlField
val productId: Int,
@ScalarCacheQuerySqlField
val storeId: Int,
@ScalarCacheQuerySqlField
val purchasePrice: Float) {
/**
* `toString` implementation.
*/
override def toString: String = {
val sb: StringBuilder = new StringBuilder
sb.append("FactPurchase ")
sb.append("[id=").append(id)
sb.append(", productId=").append(productId)
sb.append(", storeId=").append(storeId)
sb.append(", purchasePrice=").append(purchasePrice)
sb.append(']')
sb.toString()
}
}