blob: 3f7056e0d31addbcdc968a5529f44f8733249448 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.performance
import java.io.File
import java.util
import java.util.concurrent.TimeUnit
import java.util.Collections
import java.util.UUID
import com.google.common.base.Stopwatch
import com.google.common.collect.ImmutableList
import com.google.common.collect.ImmutableMap
import org.apache.commons.lang3.RandomStringUtils
import org.apache.samza.config.{Config, JobConfig, MapConfig, StorageConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.context.ContainerContextImpl
import org.apache.samza.context.JobContextImpl
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.serializers.ByteSerde
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.serializers.UUIDSerde
import org.apache.samza.storage.StorageEngineFactory
import org.apache.samza.storage.StorageEngineFactory.StoreMode
import org.apache.samza.storage.kv.KeyValueStorageEngine
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.SystemProducers
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.util.{CommandLine, FileUtil, Logging, ReflectionUtil}
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.JavaConverters._
import scala.util.Random
/**
* A simple CLI-based tool for running various key-value performance tests.
*
* List of KeyValuePerformance tests must be defined in 'test.methods' configuration as a comma-separated value.
* The tool splits this list to determine which tests to run.
*
* Each test should define its own set of configuration for partition count, stores etc.
* using the "test.<test-name>.<config-string>=<config-value>" pattern
*
* Each test may define one or more test parameterss.
* For example, test1 can define 2 sets of parameters by specifying "test.test1.set.count=2" and
* define each set as:
* "test.test1.set-1.<param-name>=<param-value>"
* "test.test1.set-2.<param-name>=<param-value>"
*/
object TestKeyValuePerformance extends Logging {
val Encoding = "UTF-8"
val JobId = RandomStringUtils.random(10)
val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit] = Map(
"all-with-deletes" -> runTestAllWithDeletes,
"rocksdb-write-performance" -> runTestMsgWritePerformance,
"rocksdb-concurrent-write-performance" -> runTestConcurrentMsgWritePerformance,
"get-all-vs-get-write-many-read-many" -> runTestGetAllVsGetWriteManyReadMany,
"get-all-vs-get-write-once-read-many" -> runTestGetAllVsGetWriteOnceReadMany)
def main(args: Array[String]) {
val cmdline = new CommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
val tests = config.get("test.methods").split(",")
tests.foreach{ test =>
info("Running test: %s" format test)
if(testMethods.contains(test)) {
val testConfig: util.Map[String, String] = new MapConfig(config.subset("test." + test + ".", true))
val jobConfig: util.Map[String, String] = ImmutableMap.of(JobConfig.JOB_NAME, test, JobConfig.JOB_ID, JobId)
val combinedConfig: Config = new MapConfig(ImmutableList.of(testConfig, jobConfig))
invokeTest(test, testMethods(test), combinedConfig)
} else {
error("Invalid test method. valid methods are: %s" format testMethods.keys)
throw new SamzaException("Unknown test method: %s" format test)
}
}
}
def invokeTest(testName: String, testMethod: (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit, config: Config) {
val partitionCount = config.getInt("partition.count", 1)
val tasks = (0 until partitionCount)
.map(i => new Partition(i))
.map(partition => (new TaskName(partition.toString),
new TaskModel(new TaskName(partition.toString),
Collections.singleton(new SystemStreamPartition("system", "stream", partition)),
partition)))
.toMap
val producerMultiplexer = new SystemProducers(
Map[String, SystemProducer](),
new SerdeManager
)
val storageConfig = new StorageConfig(config)
// Build a Map[String, StorageEngineFactory]. The key is the store name.
val storageEngineMappings = storageConfig
.getStoreNames.asScala
.map(storeName => {
val storageFactoryClassName =
JavaOptionals.toRichOptional(storageConfig.getStorageFactoryClassName(storeName)).toOption
.getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
(storeName, ReflectionUtil.getObj(storageFactoryClassName,
classOf[StorageEngineFactory[Array[Byte], Array[Byte]]]))
})
for((storeName, storageEngine) <- storageEngineMappings) {
val testSetCount = storageConfig.getInt("set.count", 1)
(1 to testSetCount).foreach(testSet => {
//Create a new DB instance for each test set
val output = new File("/tmp/" + UUID.randomUUID())
val byteSerde = new ByteSerde
val engine = storageEngine.getStorageEngine(
storeName,
output,
byteSerde,
byteSerde,
new TaskInstanceCollector(producerMultiplexer),
new MetricsRegistryMap,
null,
JobContextImpl.fromConfigWithDefaults(storageConfig, null),
new ContainerContextImpl(new ContainerModel("0", tasks.asJava), new MetricsRegistryMap), StoreMode.ReadWrite
)
val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) {
throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")
} else {
engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]
}
// Run the test method
testMethod(db, storageConfig.subset("set-" + testSet + ".", true))
new FileUtil().rm(output)
})
}
}
def runTestAllWithDeletes(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
val numLoops = config.getInt("num.loops", 100)
val messagesPerBatch = config.getInt("messages.per.batch", 10000)
val messageSizeBytes = config.getInt("message.size.bytes", 200)
info("Using (num loops, messages per batch, message size in bytes) => (%s, %s, %s)" format (numLoops, messagesPerBatch, messageSizeBytes))
new TestKeyValuePerformance().testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
}
def runTestMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
val messageSizeBytes = config.getInt("message.size", 200)
val messageCount = config.getInt("message.count", 10000)
info("Using (message count, message size in bytes) => (%s, %s)" format (messageCount, messageSizeBytes))
new TestKeyValuePerformance().testMsgWritePerformance(db, messageCount, messageSizeBytes)
}
def runTestConcurrentMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
val messageSizeBytes = config.getInt("message.size", 200)
val messageCount = config.getInt("message.count", 100000)
val numThreads = config.getInt("num.threads", 4)
new TestKeyValuePerformance().testConcurrentMsgWritePerformance(db, messageCount, messageSizeBytes, numThreads)
}
def runTestGetAllVsGetWriteManyReadMany(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
new TestKeyValuePerformance().testGetAllVsGetWriteManyReadMany(db, config)
}
def runTestGetAllVsGetWriteOnceReadMany(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
new TestKeyValuePerformance().testGetAllVsGetWriteOnceReadMany(db, config)
}
}
class TestKeyValuePerformance extends Logging {
import TestKeyValuePerformance._
/**
* A test that writes messagesPerBatch messages, deletes them all, then calls
* store.all. The test periodically outputs the time it takes to complete
* these operations. This test is useful to trouble shoot issues with LevleDB
* such as the issue documented in SAMZA-254.
*/
def testAllWithDeletes(
store: KeyValueStore[Array[Byte], Array[Byte]],
/**
* How many times a batch of messages should be written and deleted.
*/
numLoops: Int = 100,
/**
* The number of messages to write and delete per-batch.
*/
messagesPerBatch: Int = 10000,
/**
* The size of the messages to write.
*/
messageSizeBytes: Int = 200) {
val stuff = (0 until messageSizeBytes).map(i => "a").mkString.getBytes(Encoding)
val start = System.currentTimeMillis
(0 until numLoops).foreach(i => {
info("(%sms) Total written to store: %s" format (System.currentTimeMillis - start, i * messagesPerBatch))
(0 until messagesPerBatch).foreach(j => {
val k = (i * j).toString.getBytes(Encoding)
store.put(k, stuff)
store.delete(k)
})
val allStart = System.currentTimeMillis
val iter = store.all
info("(%sms) all() took %sms." format (System.currentTimeMillis - start, System.currentTimeMillis - allStart))
iter.close
})
info("Total time: %ss" format ((System.currentTimeMillis - start) * .001))
}
/**
* Test that successively writes a set of fixed-size messages to the KV store
* and computes the total time for the operations
* @param store Key-Value store instance that is being tested
* @param numMsgs Total number of messages to write to the store
* @param msgSizeInBytes Size of each message in Bytes
*/
def testMsgWritePerformance(
store: KeyValueStore[Array[Byte], Array[Byte]],
numMsgs: Int = 10000,
msgSizeInBytes: Int = 200) {
val msg = (0 until msgSizeInBytes).map(i => "x").mkString.getBytes(Encoding)
val start = System.currentTimeMillis
(0 until numMsgs).foreach(i => {
store.put(i.toString.getBytes(Encoding), msg)
})
val timeTaken = System.currentTimeMillis - start
info("Total time to write %d msgs of size %d bytes : %s s" format (numMsgs, msgSizeInBytes, timeTaken * .001))
}
def testConcurrentMsgWritePerformance(
store: KeyValueStore[Array[Byte], Array[Byte]],
numMsgs: Int = 100000,
msgSizeInBytes: Int = 200,
numThreads: Int = 4) {
val msg = (0 until msgSizeInBytes).map(i => "x").mkString.getBytes(Encoding)
def createThread(name: String): Thread = new Thread(new Runnable {
override def run() = {
(0 until numMsgs).foreach(i => {
store.put(i.toString.getBytes(Encoding), msg)
})
}
}, name)
val threads = (0 until numThreads).map(i => createThread(s"Writer $i"))
val start = System.currentTimeMillis
threads.foreach(_.start())
threads.foreach(_.join())
val timeTaken = System.currentTimeMillis - start
info("Total time to write %d msgs of size %d bytes with %s threads: %s sec"
format (numMsgs, msgSizeInBytes, numThreads, timeTaken * .001))
}
/**
* Test that ::getAll performance is better than that of ::get (test when there are many writes and many reads).
* @param store key-value store instance that is being tested
* @param config the test case's config
*/
def testGetAllVsGetWriteManyReadMany(store: KeyValueStore[Array[Byte],Array[Byte]], config: Config): Unit = {
val iterationsCount = config.getInt("iterations.count", 100)
val maxMessagesCountPerBatch = config.getInt("message.max-count-per-batch", 100000)
val maxMessageSizeBytes = config.getInt("message.max-size.bytes", 1024)
val timer = Stopwatch.createUnstarted
val uuidSerde = new UUIDSerde
info("iterations count: " + iterationsCount)
info("max messages count per batch: " + maxMessagesCountPerBatch)
info("max message size in bytes: " + maxMessageSizeBytes)
info("%12s%12s%12s%12s".format("Msg Count", "Bytes/Msg", "get ms", "getAll ms"))
try {
(0 until iterationsCount).foreach(i => {
val messageSizeBytes = Random.nextInt(maxMessageSizeBytes)
val messagesCountPerBatch = Random.nextInt(maxMessagesCountPerBatch)
val keys = (0 until messagesCountPerBatch).map(k => uuidSerde.toBytes(UUID.randomUUID)).toList
val shuffledKeys = Random.shuffle(keys) // to reduce locality of reference -- sequential access may be unfair
keys.foreach(k => store.put(k, Random.nextString(messageSizeBytes).getBytes(Encoding)))
store.flush()
timer.reset().start()
assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size)
val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
// Restore cache, in case it's enabled, to a state similar to the one above when the getAll test started
keys.foreach(k => store.put(k, Random.nextString(messageSizeBytes).getBytes(Encoding)))
store.flush()
timer.reset().start()
shuffledKeys.foreach(store.get)
val getTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
info("%12d%12d%12d%12d".format(messagesCountPerBatch, messageSizeBytes, getTime, getAllTime))
if (getAllTime > getTime) {
error("getAll was slower than get!")
}
})
} finally {
store.close()
}
}
/**
* Test that ::getAll performance is better than that of ::get (test when data are written once and read many times);
* load is usually greater than the storage engine's cache size (not to be confused with Samza's cache layer),
* and keys are randomly selected from the stored entries to perform a fair comparison of ::get vs. ::getAll (in case
* the underlying storage engine caches data in blocks and ::getAll causes a block to be loaded into the cache --
* one can argue that ::get should trigger the same behavior, but it's worth testing this WORM scenario regardless)
* @param store key-value store instance that is being tested
* @param config the test case's config
*/
def testGetAllVsGetWriteOnceReadMany(store: KeyValueStore[Array[Byte],Array[Byte]], config: Config): Unit = {
val iterationsCount = config.getInt("iterations.count", 100)
val maxMessagesCountPerBatch = config.getInt("message.max-count-per-batch", 10000 + Random.nextInt(20000))
val maxMessageSizeBytes = config.getInt("message.max-size.bytes", 1024)
val totalMessagesCount = iterationsCount * maxMessagesCountPerBatch
val timer = Stopwatch.createUnstarted
val uuidSerde = new UUIDSerde
info("write once -- putting %d messages in store".format(totalMessagesCount))
val keys = (0 until totalMessagesCount).map(k => uuidSerde.toBytes(UUID.randomUUID)).toList
keys.foreach(k => store.put(k, Random.nextString(Random.nextInt(maxMessageSizeBytes)).getBytes(Encoding)))
store.flush()
info("iterations count: " + iterationsCount)
info("max messages count per batch: " + maxMessagesCountPerBatch)
info("max message size in bytes: " + maxMessageSizeBytes)
info("%12s%12s%12s%12s".format("Msg Count", "Total Size", "get ms", "getAll ms"))
try {
(0 until iterationsCount).foreach(i => {
val messagesCountPerBatch = Random.nextInt(maxMessagesCountPerBatch)
val shuffledKeys = Random.shuffle(keys).take(messagesCountPerBatch)
// We want to measure ::getAll when called many times, so populate the cache because first call is a cache-miss
val totalSize = store.getAll(shuffledKeys.asJava).values.asScala.map(_.length).sum
timer.reset().start()
assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size)
val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
// We want to measure ::get when called many times, so populate the cache because first call is a cache-miss
shuffledKeys.foreach(store.get)
timer.reset().start()
shuffledKeys.foreach(store.get)
val getTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
info("%12d%12d%12d%12d".format(messagesCountPerBatch, totalSize, getTime, getAllTime))
if (getAllTime > getTime) {
error("getAll was slower than get!")
}
})
} finally {
store.close()
}
}
}