blob: d113297ac4d547c8be97be8d5893e807e46d2306 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.scalar.examples
import java.lang.{Integer => JavaInt, Long => JavaLong}
import java.util
import java.util.Map.Entry
import java.util.Timer
import javax.cache.processor.{EntryProcessor, MutableEntry}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.internal.util.scala.impl
import org.apache.ignite.scalar.scalar
import org.apache.ignite.scalar.scalar._
import org.apache.ignite.stream.StreamReceiver
import org.apache.ignite.{IgniteCache, IgniteException}
import scala.collection.JavaConversions._
import scala.util.Random
/**
* Real time popular number counter.
* <p/>
* Remote nodes should always be started with special configuration file which
* enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`.
* <p/>
* Alternatively you can run `ExampleNodeStartup` in another JVM which will
* start node with `examples/config/example-ignite.xml` configuration.
* <p/>
* The counts are kept in cache on all remote nodes. Top `10` counts from each node are then grabbed to produce
* an overall top `10` list within the ignite.
*/
object ScalarCachePopularNumbersExample extends App {
/** Configuration file name. */
private val CONFIG = "examples/config/example-ignite.xml"
/** Cache name. */
private final val NAME = ScalarCachePopularNumbersExample.getClass.getSimpleName
/** Count of most popular numbers to retrieve from cluster. */
private final val POPULAR_NUMBERS_CNT = 10
/** Random number generator. */
private final val RAND = new Random()
/** Range within which to generate numbers. */
private final val RANGE = 1000
/** Count of total numbers to generate. */
private final val CNT = 1000000
scalar(CONFIG) {
val cache = createCache$[JavaInt, JavaLong](NAME, indexedTypes = Seq(classOf[JavaInt], classOf[JavaLong]))
println()
println(">>> Cache popular numbers example started.")
try {
val prj = ignite$.cluster().forCacheNodes(NAME)
if (prj.nodes().isEmpty)
println("Ignite does not have cache configured: " + NAME)
else {
val popularNumbersQryTimer = new Timer("numbers-query-worker")
try {
// Schedule queries to run every 3 seconds during populates cache phase.
popularNumbersQryTimer.schedule(timerTask(query(POPULAR_NUMBERS_CNT)), 3000, 3000)
streamData()
// Force one more run to get final counts.
query(POPULAR_NUMBERS_CNT)
}
finally {
popularNumbersQryTimer.cancel()
}
}
}
finally {
cache.destroy()
}
}
/**
* Populates cache in real time with numbers and keeps count for every number.
* @throws IgniteException If failed.
*/
@throws[IgniteException]
def streamData() {
// Set larger per-node buffer size since our state is relatively small.
// Reduce parallel operations since we running the whole ignite cluster locally under heavy load.
val smtr = dataStreamer$[JavaInt, JavaLong](NAME, 2048)
smtr.receiver(new IncrementingUpdater())
(0 until CNT) foreach (_ => smtr.addData(RAND.nextInt(RANGE), 1L))
smtr.close(false)
}
/**
* Queries a subset of most popular numbers from in-memory data ignite cluster.
*
* @param cnt Number of most popular numbers to return.
*/
def query(cnt: Int) {
val results = cache$[JavaInt, JavaLong](NAME).get
.query(new SqlFieldsQuery("select _key, _val from Long order by _val desc, _key limit " + cnt))
.getAll
results.foreach(res => println(res.get(0) + "=" + res.get(1)))
println("------------------")
}
/**
* Increments value for key.
*/
private class IncrementingUpdater extends StreamReceiver[JavaInt, JavaLong] {
private[this] final val INC = new EntryProcessor[JavaInt, JavaLong, Object]() {
/** Process entries to increase value by entry key. */
override def process(e: MutableEntry[JavaInt, JavaLong], args: AnyRef*): Object = {
e.setValue(Option(e.getValue)
.map(l => JavaLong.valueOf(l + 1))
.getOrElse(JavaLong.valueOf(1L)))
null
}
}
@impl def receive(cache: IgniteCache[JavaInt, JavaLong], entries: util.Collection[Entry[JavaInt, JavaLong]]) {
entries.foreach(entry => cache.invoke(entry.getKey, INC))
}
}
}