package org.apache.samza.test.performance
import java.util
import java.util.concurrent.TimeUnit
import java.util.Collections
import java.util.UUID
import org.apache.commons.lang3.RandomStringUtils
import org.apache.samza.config.{Config, JobConfig, MapConfig, StorageConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.context.ContainerContextImpl
import org.apache.samza.context.JobContextImpl
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.serializers.ByteSerde
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.serializers.UUIDSerde
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.SystemProducers
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.util.{CommandLine, FileUtil, Logging, ReflectionUtil}
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.JavaConverters._
import scala.util.Random
* A simple CLI-based tool for running various key-value performance tests.
* List of KeyValuePerformance tests must be defined in 'test.methods' configuration as a comma-separated value.
* The tool splits this list to determine which tests to run.
* Each test should define its own set of configuration for partition count, stores etc.
* using the "test.<test-name>.<config-string>=<config-value>" pattern
* Each test may define one or more test parameterss.
* For example, test1 can define 2 sets of parameters by specifying "test.test1.set.count=2" and
* define each set as:
* "test.test1.set-1.<param-name>=<param-value>"
* "test.test1.set-2.<param-name>=<param-value>"
object TestKeyValuePerformance extends Logging {
val Encoding = "UTF-8"
val JobId = RandomStringUtils.random(10)
val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit] = Map(
"all-with-deletes" -> runTestAllWithDeletes,
"rocksdb-write-performance" -> runTestMsgWritePerformance,
"rocksdb-concurrent-write-performance" -> runTestConcurrentMsgWritePerformance,
"get-all-vs-get-write-many-read-many" -> runTestGetAllVsGetWriteManyReadMany,
"get-all-vs-get-write-once-read-many" -> runTestGetAllVsGetWriteOnceReadMany)
def main(args: Array[String]) {
val cmdline = new CommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
val tests = config.get("test.methods").split(",")
tests.foreach{ test =>
info("Running test: %s" format test)
if(testMethods.contains(test)) {
val testConfig: util.Map[String, String] = new MapConfig(config.subset("test." + test + ".", true))
val jobConfig: util.Map[String, String] = ImmutableMap.of(JobConfig.JOB_NAME, test, JobConfig.JOB_ID, JobId)
val combinedConfig: Config = new MapConfig(ImmutableList.of(testConfig, jobConfig))
invokeTest(test, testMethods(test), combinedConfig)
} else {
error("Invalid test method. valid methods are: %s" format testMethods.keys)
throw new SamzaException("Unknown test method: %s" format test)
def invokeTest(testName: String, testMethod: (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit, config: Config) {
val partitionCount = config.getInt("partition.count", 1)
val tasks = (0 until partitionCount)
.map(i => new Partition(i))
.map(partition => (new TaskName(partition.toString),
new TaskModel(new TaskName(partition.toString),
Collections.singleton(new SystemStreamPartition("system", "stream", partition)),
val producerMultiplexer = new SystemProducers(
Map[String, SystemProducer](),
new SerdeManager
val storageConfig = new StorageConfig(config)
// Build a Map[String, StorageEngineFactory]. The key is the store name.
val storageEngineMappings = storageConfig
.map(storeName => {
val storageFactoryClassName =
.getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
(storeName, ReflectionUtil.getObj(storageFactoryClassName,
classOf[StorageEngineFactory[Array[Byte], Array[Byte]]]))
for((storeName, storageEngine) <- storageEngineMappings) {
val testSetCount = storageConfig.getInt("set.count", 1)
(1 to testSetCount).foreach(testSet => {
//Create a new DB instance for each test set
val output = new File("/tmp/" + UUID.randomUUID())
val byteSerde = new ByteSerde
val engine = storageEngine.getStorageEngine(
new TaskInstanceCollector(producerMultiplexer),
new MetricsRegistryMap,
JobContextImpl.fromConfigWithDefaults(storageConfig, null),
new ContainerContextImpl(new ContainerModel("0", tasks.asJava), new MetricsRegistryMap), StoreMode.ReadWrite
val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) {
throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")
} else {
engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]
// Run the test method
testMethod(db, storageConfig.subset("set-" + testSet + ".", true))
new FileUtil().rm(output)
def runTestAllWithDeletes(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
val numLoops = config.getInt("num.loops", 100)
val messagesPerBatch = config.getInt("messages.per.batch", 10000)
val messageSizeBytes = config.getInt("message.size.bytes", 200)
info("Using (num loops, messages per batch, message size in bytes) => (%s, %s, %s)" format (numLoops, messagesPerBatch, messageSizeBytes))
new TestKeyValuePerformance().testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
def runTestMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
val messageSizeBytes = config.getInt("message.size", 200)
val messageCount = config.getInt("message.count", 10000)
info("Using (message count, message size in bytes) => (%s, %s)" format (messageCount, messageSizeBytes))
new TestKeyValuePerformance().testMsgWritePerformance(db, messageCount, messageSizeBytes)
def runTestConcurrentMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
val messageSizeBytes = config.getInt("message.size", 200)
val messageCount = config.getInt("message.count", 100000)
val numThreads = config.getInt("num.threads", 4)
new TestKeyValuePerformance().testConcurrentMsgWritePerformance(db, messageCount, messageSizeBytes, numThreads)
def runTestGetAllVsGetWriteManyReadMany(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
new TestKeyValuePerformance().testGetAllVsGetWriteManyReadMany(db, config)
def runTestGetAllVsGetWriteOnceReadMany(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
new TestKeyValuePerformance().testGetAllVsGetWriteOnceReadMany(db, config)
class TestKeyValuePerformance extends Logging {
import TestKeyValuePerformance._
* A test that writes messagesPerBatch messages, deletes them all, then calls
* store.all. The test periodically outputs the time it takes to complete
* these operations. This test is useful to trouble shoot issues with LevleDB
* such as the issue documented in SAMZA-254.
def testAllWithDeletes(
store: KeyValueStore[Array[Byte], Array[Byte]],
* How many times a batch of messages should be written and deleted.
numLoops: Int = 100,
* The number of messages to write and delete per-batch.
messagesPerBatch: Int = 10000,
* The size of the messages to write.
messageSizeBytes: Int = 200) {
val stuff = (0 until messageSizeBytes).map(i => "a").mkString.getBytes(Encoding)
val start = System.currentTimeMillis
(0 until numLoops).foreach(i => {
info("(%sms) Total written to store: %s" format (System.currentTimeMillis - start, i * messagesPerBatch))
(0 until messagesPerBatch).foreach(j => {
val k = (i * j).toString.getBytes(Encoding)
store.put(k, stuff)
val allStart = System.currentTimeMillis
val iter = store.all
info("(%sms) all() took %sms." format (System.currentTimeMillis - start, System.currentTimeMillis - allStart))
info("Total time: %ss" format ((System.currentTimeMillis - start) * .001))
* Test that successively writes a set of fixed-size messages to the KV store
* and computes the total time for the operations
* @param store Key-Value store instance that is being tested
* @param numMsgs Total number of messages to write to the store
* @param msgSizeInBytes Size of each message in Bytes
def testMsgWritePerformance(
store: KeyValueStore[Array[Byte], Array[Byte]],
numMsgs: Int = 10000,
msgSizeInBytes: Int = 200) {
val msg = (0 until msgSizeInBytes).map(i => "x").mkString.getBytes(Encoding)
val start = System.currentTimeMillis
(0 until numMsgs).foreach(i => {
store.put(i.toString.getBytes(Encoding), msg)
val timeTaken = System.currentTimeMillis - start
info("Total time to write %d msgs of size %d bytes : %s s" format (numMsgs, msgSizeInBytes, timeTaken * .001))
def testConcurrentMsgWritePerformance(
store: KeyValueStore[Array[Byte], Array[Byte]],
numMsgs: Int = 100000,
msgSizeInBytes: Int = 200,
numThreads: Int = 4) {
val msg = (0 until msgSizeInBytes).map(i => "x").mkString.getBytes(Encoding)
def createThread(name: String): Thread = new Thread(new Runnable {
override def run() = {
(0 until numMsgs).foreach(i => {
store.put(i.toString.getBytes(Encoding), msg)
}, name)
val threads = (0 until numThreads).map(i => createThread(s"Writer $i"))
val start = System.currentTimeMillis
val timeTaken = System.currentTimeMillis - start
info("Total time to write %d msgs of size %d bytes with %s threads: %s sec"
format (numMsgs, msgSizeInBytes, numThreads, timeTaken * .001))
* Test that ::getAll performance is better than that of ::get (test when there are many writes and many reads).
* @param store key-value store instance that is being tested
* @param config the test case's config
def testGetAllVsGetWriteManyReadMany(store: KeyValueStore[Array[Byte],Array[Byte]], config: Config): Unit = {
val iterationsCount = config.getInt("iterations.count", 100)
val maxMessagesCountPerBatch = config.getInt("message.max-count-per-batch", 100000)
val maxMessageSizeBytes = config.getInt("message.max-size.bytes", 1024)
val timer = Stopwatch.createUnstarted
val uuidSerde = new UUIDSerde
info("iterations count: " + iterationsCount)
info("max messages count per batch: " + maxMessagesCountPerBatch)
info("max message size in bytes: " + maxMessageSizeBytes)
info("%12s%12s%12s%12s".format("Msg Count", "Bytes/Msg", "get ms", "getAll ms"))
try {
(0 until iterationsCount).foreach(i => {
val messageSizeBytes = Random.nextInt(maxMessageSizeBytes)
val messagesCountPerBatch = Random.nextInt(maxMessagesCountPerBatch)
val keys = (0 until messagesCountPerBatch).map(k => uuidSerde.toBytes(UUID.randomUUID)).toList
val shuffledKeys = Random.shuffle(keys) // to reduce locality of reference -- sequential access may be unfair
keys.foreach(k => store.put(k, Random.nextString(messageSizeBytes).getBytes(Encoding)))
assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size)
val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
// Restore cache, in case it's enabled, to a state similar to the one above when the getAll test started
keys.foreach(k => store.put(k, Random.nextString(messageSizeBytes).getBytes(Encoding)))
val getTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
info("%12d%12d%12d%12d".format(messagesCountPerBatch, messageSizeBytes, getTime, getAllTime))
if (getAllTime > getTime) {
error("getAll was slower than get!")
} finally {
* Test that ::getAll performance is better than that of ::get (test when data are written once and read many times);
* load is usually greater than the storage engine's cache size (not to be confused with Samza's cache layer),
* and keys are randomly selected from the stored entries to perform a fair comparison of ::get vs. ::getAll (in case
* the underlying storage engine caches data in blocks and ::getAll causes a block to be loaded into the cache --
* one can argue that ::get should trigger the same behavior, but it's worth testing this WORM scenario regardless)
* @param store key-value store instance that is being tested
* @param config the test case's config
def testGetAllVsGetWriteOnceReadMany(store: KeyValueStore[Array[Byte],Array[Byte]], config: Config): Unit = {
val iterationsCount = config.getInt("iterations.count", 100)
val maxMessagesCountPerBatch = config.getInt("message.max-count-per-batch", 10000 + Random.nextInt(20000))
val maxMessageSizeBytes = config.getInt("message.max-size.bytes", 1024)
val totalMessagesCount = iterationsCount * maxMessagesCountPerBatch
val timer = Stopwatch.createUnstarted
val uuidSerde = new UUIDSerde
info("write once -- putting %d messages in store".format(totalMessagesCount))
val keys = (0 until totalMessagesCount).map(k => uuidSerde.toBytes(UUID.randomUUID)).toList
keys.foreach(k => store.put(k, Random.nextString(Random.nextInt(maxMessageSizeBytes)).getBytes(Encoding)))
info("iterations count: " + iterationsCount)
info("max messages count per batch: " + maxMessagesCountPerBatch)
info("max message size in bytes: " + maxMessageSizeBytes)
info("%12s%12s%12s%12s".format("Msg Count", "Total Size", "get ms", "getAll ms"))
try {
(0 until iterationsCount).foreach(i => {
val messagesCountPerBatch = Random.nextInt(maxMessagesCountPerBatch)
val shuffledKeys = Random.shuffle(keys).take(messagesCountPerBatch)
// We want to measure ::getAll when called many times, so populate the cache because first call is a cache-miss
val totalSize = store.getAll(shuffledKeys.asJava)
assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size)
val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
// We want to measure ::get when called many times, so populate the cache because first call is a cache-miss
val getTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
info("%12d%12d%12d%12d".format(messagesCountPerBatch, totalSize, getTime, getAllTime))
if (getAllTime > getTime) {
error("getAll was slower than get!")
} finally {