* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
namespace Apache.Ignite.Benchmarks
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using Apache.Ignite.Benchmarks.Result;
/// <summary>
/// Benchmark base class.
/// </summary>
internal abstract class BenchmarkBase
/** Result writer type: console. */
private const string ResultWriterConsole = "console";
/** Result writer type: file. */
private const string ResultWriterFile = "file";
/** Default duration. */
private const int DefaultDuration = 60;
/** Default maximum errors count. */
private const int DefaultMaxErrors = 100;
/** Default percentile result buckets count. */
private const int DEfaultResultBucketCount = 10000;
/** Default percentile result bucket interval. */
private const int DefaultResultBucketInterval = 100;
/** Default batch size. */
private const int DefaultBatchSize = 1;
/** Default result wrier. */
private const string DefaultResultWriter = ResultWriterConsole;
/** Start flag. */
private volatile bool _start;
/** Stop flag. */
private volatile bool _stop;
/** Warmup flag. */
private volatile bool _warmup = true;
/** Ready threads. */
private int _readyThreads;
/** Finished threads. */
private volatile int _finishedThreads;
/** Descriptors. */
private volatile ICollection<BenchmarkOperationDescriptor> _descs;
/** Benchmark tasks. */
private volatile ICollection<BenchmarkTask> _tasks;
/** Percentile results. */
private volatile IDictionary<string, long[]> _percentiles;
/** Currently completed operations. */
private long _curOps;
/** Error count. */
private long _errorCount;
/** Watches to count total execution time. */
private readonly Stopwatch _totalWatch = new Stopwatch();
/** Warmup barrier. */
private Barrier _barrier;
/** Benchmark result writer. */
private IBenchmarkResultWriter _writer;
/// <summary>
/// Default constructor.
/// </summary>
protected BenchmarkBase()
Duration = DefaultDuration;
MaxErrors = DefaultMaxErrors;
ResultBucketCount = DEfaultResultBucketCount;
ResultBucketInterval = DefaultResultBucketInterval;
BatchSize = DefaultBatchSize;
ResultWriter = DefaultResultWriter;
/// <summary>
/// Run the benchmark.
/// </summary>
public void Run()
PrintDebug("Started benchmark: " + this);
if (ResultWriter.Equals(ResultWriterConsole, StringComparison.OrdinalIgnoreCase))
_writer = new BenchmarkConsoleResultWriter();
_writer = new BenchmarkFileResultWriter();
PrintDebug("Benchmark setup finished.");
_descs = new List<BenchmarkOperationDescriptor>();
if (_descs.Count == 0)
throw new Exception("No tasks provided for benchmark.");
// Initialize writer.
var opNames = new List<string>(_descs.Select(desc => desc.Name));
PrintDebug(() =>
var sb = new StringBuilder("Operations: ");
foreach (var opName in opNames)
sb.Append(opName).Append(' ');
return sb.ToString();
_writer.Initialize(this, opNames);
PrintDebug("Initialized result writer.");
// Start worker threads.
_tasks = new List<BenchmarkTask>(Threads);
PrintDebug("Starting worker threads: " + Threads);
for (var i = 0; i < Threads; i++)
var task = new BenchmarkTask(this, _descs);
new Thread(task.Run).Start();
PrintDebug("Waiting worker threads to start: " + Threads);
// Await for all threads to start in spin loop.
while (Thread.VolatileRead(ref _readyThreads) < Threads)
PrintDebug("Worker threads started: " + Threads);
// Start throughput writer thread.
var writerThread = new Thread(new ThroughputTask(this).Run) {IsBackground = true};
PrintDebug("Started throughput writer thread.");
// Start warmup thread if needed.
if (Warmup > 0)
var thread = new Thread(new WarmupTask(this, Warmup).Run) {IsBackground = true};
PrintDebug("Started warmup timeout thread: " + Warmup);
_warmup = false;
_barrier = new Barrier(Threads, b =>
Console.WriteLine("Warmup finished.");
// Start timeout thread if needed.
if (Duration > 0)
if (Operations > 0)
PrintDebug("Duration argument is ignored because operations number is set: " +
var thread = new Thread(new TimeoutTask(this, Warmup + Duration).Run) {IsBackground = true};
PrintDebug("Started duration timeout thread: " + Duration);
// Let workers start execution.
_start = true;
// Await workers completion.
PrintDebug("Awaiting worker threads completion.");
while (_finishedThreads < Threads)
PrintDebug("Worker threads completed.");
PrintDebug("Tear down invoked.");
if (PrintThroughputInfo())
var avgThroughput = _totalWatch.ElapsedMilliseconds == 0
? 0
: _curOps*1000/_totalWatch.ElapsedMilliseconds;
var avgLatency = _curOps == 0
? 0
: (double) _totalWatch.ElapsedMilliseconds*Threads/_curOps;
Console.WriteLine("Finishing benchmark [name=" + GetType().Name +
", time=" + _totalWatch.ElapsedMilliseconds +
"ms, ops=" + _curOps +
", threads=" + Threads +
", avgThroughput=" + avgThroughput +
", avgLatency=" + string.Format("{0:0.000}ms", avgLatency) + ']');
Console.WriteLine("Finishing benchmark [name=" + GetType().Name +
", time=" + _totalWatch.ElapsedMilliseconds +
"ms, ops=" + Operations +
", threads=" + Threads + ']');
_percentiles = new Dictionary<string, long[]>(_descs.Count);
foreach (var desc in _descs)
_percentiles[desc.Name] = new long[ResultBucketCount];
foreach (var task in _tasks)
foreach (var percentile in _percentiles)
_writer.WritePercentiles(percentile.Key, ResultBucketInterval, percentile.Value);
PrintDebug("Results committed to output writer.");
/// <summary>
/// Consumes passed argument.
/// </summary>
/// <param name="name">Argument name.</param>
/// <param name="val">Value.</param>
/// <returns>True if argument was consumed.</returns>
public void Configure(string name, string val)
var prop = BenchmarkUtils.GetProperty(this, name);
if (prop != null)
BenchmarkUtils.SetProperty(this, prop, val);
/// <summary>
/// Start callback.
/// </summary>
protected virtual void OnStarted()
// No-op.
/// <summary>
/// Warmup finished callback. Executed by each worker thread once.
/// </summary>
protected virtual void OnWarmupFinished()
// No-op.
/// <summary>
/// Batch execution started callback.
/// </summary>
/// <param name="state">State.</param>
protected virtual void OnBatchStarted(BenchmarkState state)
// No-op.
/// <summary>
/// Batch execution finished callback.
/// </summary>
/// <param name="state">State.</param>
/// <param name="duration">Duration.</param>
/// <returns>True if this result must be counted.</returns>
protected virtual bool OnBatchFinished(BenchmarkState state, long duration)
return true;
/// <summary>
/// Benchmarh finished callback.
/// </summary>
protected virtual void OnFinished()
// No-op.
/// <returns>Flag indicating whether benchmark should print throughput information.</returns>
protected virtual bool PrintThroughputInfo()
return true;
/// <summary>
/// Internal arguments validation routine.
/// </summary>
/// <returns>True if base class must validate common arguments, false otherwise.</returns>
protected virtual bool ValidateArgumentsEx()
return true;
/// <summary>
/// Print debug to console.
/// </summary>
/// <param name="msg">Message</param>
protected void PrintDebug(string msg)
if (Debug)
Console.WriteLine("[DEBUG] " + Thread.CurrentThread.ManagedThreadId + ": " + msg);
/// <summary>
/// Print debug to console.
/// </summary>
/// <param name="msgFunc">Message delegate.</param>
protected void PrintDebug(Func<string> msgFunc)
if (Debug)
/// <summary>
/// Add operation descriptors.
/// </summary>
/// <param name="descs">Collection where operation descriptors must be added.</param>
protected abstract void GetDescriptors(ICollection<BenchmarkOperationDescriptor> descs);
/// <summary>
/// Invoked when single thread is ready to actual execution.
/// </summary>
private void OnThreadReady()
Interlocked.Increment(ref _readyThreads);
/// <summary>
/// Invoked when single thread finished execution.
/// </summary>
private void OnThreadFinished()
// ReSharper disable once NonAtomicCompoundOperator
PrintDebug("Worker thread finished.");
/// <summary>
/// Validate arguments.
/// </summary>
private void ValidateArguments()
if (ValidateArgumentsEx())
if (Threads <= 0)
throw new Exception("Threads must be positive: " + Threads);
if (Warmup < 0)
throw new Exception("Warmup cannot be negative: " + Warmup);
if (Duration < 0)
throw new Exception("Duration cannot be negative: " + Duration);
if (BatchSize <= 0)
throw new Exception("BatchSize must be positive: " + BatchSize);
if (MaxErrors < 0)
throw new Exception("MaxErrors cannot be negative: " + MaxErrors);
if (ResultWriter == null || !ResultWriter.Equals(ResultWriterConsole, StringComparison.OrdinalIgnoreCase)
&& !ResultWriter.Equals(ResultWriterFile, StringComparison.OrdinalIgnoreCase))
throw new Exception("Invalid ResultWriter: " + ResultWriter);
if (ResultWriter.Equals(ResultWriterFile, StringComparison.OrdinalIgnoreCase) && ResultFolder == null)
throw new Exception("ResultFolder must be set for file result writer.");
if (ResultBucketCount <= 0)
throw new Exception("ResultBucketCount must be positive: " + ResultBucketCount);
if (ResultBucketInterval <= 0)
throw new Exception("ResultBucketInterval must be positive: " + ResultBucketInterval);
/// <summary>
/// Get current throughput across all currenlty running threads.
/// </summary>
/// <returns>Current throughput.</returns>
private IDictionary<string, Tuple<long, long>> GetCurrentThroughput()
var total = new Dictionary<string, Tuple<long, long>>(_descs.Count);
foreach (var desc in _descs)
total[desc.Name] = new Tuple<long, long>(0, 0);
foreach (var task in _tasks)
return total;
/** <inheritDoc /> */
public override string ToString()
var sb = new StringBuilder(GetType().Name).Append('[');
var first = true;
var props = BenchmarkUtils.GetProperties(this);
foreach (var prop in props)
if (first)
first = false;
sb.Append(", ");
sb.Append(prop.Name).Append('=').Append(prop.GetValue(this, null));
return sb.ToString();
/// <summary>
/// Amount of worker threads.
/// </summary>
public int Threads { get; set; }
/// <summary>
/// Warmup duration in secnods.
/// </summary>
public int Warmup { get; set; }
/// <summary>
/// Duration in seconds.
/// </summary>
public int Duration { get; set; }
/// <summary>
/// Maximum amount of operations to perform.
/// </summary>
public int Operations { get; set; }
/// <summary>
/// Single measurement batch size.
/// </summary>
public int BatchSize { get; set; }
/// <summary>
/// Maximum amount of errors before benchmark exits.
/// </summary>
public int MaxErrors { get; set; }
/// <summary>
/// Debug flag.
/// </summary>
public bool Debug { get; set; }
/// <summary>
/// Result writer type.
/// </summary>
public string ResultWriter { get; set; }
/// <summary>
/// Result output folder.
/// </summary>
public string ResultFolder { get; set; }
/// <summary>
/// Percentile result buckets count.
/// </summary>
public int ResultBucketCount { get; set; }
/// <summary>
/// Percnetile result bucket interval in microseconds.
/// </summary>
public long ResultBucketInterval { get; set; }
/// <summary>
/// Benchmark worker task.
/// </summary>
private class BenchmarkTask
/** Benchmark. */
private readonly BenchmarkBase _benchmark;
/** Descriptors. */
private readonly BenchmarkOperationDescriptor[] _descs;
/** Results. */
private readonly IDictionary<string, Result> _results;
/** Stop watch. */
private readonly Stopwatch _watch = new Stopwatch();
/** Benchmark state. */
private readonly BenchmarkState _state;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="benchmark">Benchmark.</param>
/// <param name="descList">Descriptor list.</param>
public BenchmarkTask(BenchmarkBase benchmark,
ICollection<BenchmarkOperationDescriptor> descList)
_benchmark = benchmark;
_state = new BenchmarkState();
_results = new Dictionary<string, Result>(descList.Count);
var totalWeight = 0;
var ticksPerSlot = benchmark.ResultBucketInterval*Stopwatch.Frequency/1000000;
if (ticksPerSlot == 0)
throw new Exception("Too low bucket interval: " + benchmark.ResultBucketInterval);
foreach (var desc in descList)
_results[desc.Name] = new Result(benchmark.ResultBucketCount, ticksPerSlot);
totalWeight += desc.Weight;
_descs = new BenchmarkOperationDescriptor[totalWeight];
var idx = 0;
foreach (var desc in descList)
for (var i = 0; i < desc.Weight; i++)
_descs[idx++] = desc;
/// <summary>
/// Task routine.
/// </summary>
public void Run()
_benchmark.PrintDebug("Worker thread ready.");
while (!_benchmark._start)
_benchmark.PrintDebug("Worker thread started benchmark execution.");
var warmupIteration = true;
long maxDur = 0;
long maxOps = _benchmark.Operations;
while (!_benchmark._stop)
if (warmupIteration && !_benchmark._warmup)
warmupIteration = false;
if (!warmupIteration)
if (maxOps > 0 && Interlocked.Read(ref _benchmark._curOps) > maxOps)
var desc = _descs.Length == 1
? _descs[0]
: _descs[BenchmarkUtils.GetRandomInt(_descs.Length)];
var res = true;
for (var i = 0; i < _benchmark.BatchSize; i++)
if (!warmupIteration)
Interlocked.Add(ref _benchmark._curOps, _benchmark.BatchSize);
catch (Exception e)
Console.WriteLine("Exception: " + e);
res = false;
if (_benchmark.MaxErrors > 0 &&
Interlocked.Increment(ref _benchmark._errorCount) > _benchmark.MaxErrors)
lock (_benchmark)
Console.WriteLine("Benchmark is stopped due to too much errors: " +
var curDur = _watch.ElapsedTicks;
if (res)
res = _benchmark.OnBatchFinished(_state, curDur);
if (curDur > maxDur)
maxDur = curDur;
_benchmark.PrintDebug("The longest execution [warmup=" + warmupIteration +
", dur(nsec)=" + maxDur*1000000000/Stopwatch.Frequency + ']');
if (!warmupIteration && res)
_benchmark.PrintDebug("Worker thread stopped.");
/// <summary>
/// Collect throughput for the current task.
/// </summary>
/// <param name="total">Total result.</param>
public void CollectThroughput(IDictionary<string, Tuple<long, long>> total)
foreach (var result in _results)
var old = total[result.Key];
total[result.Key] = new Tuple<long, long>(old.Item1 + result.Value.Duration,
old.Item2 + result.Value.OpCount);
/// <summary>
/// Collect percnetiles for the current task.
/// </summary>
/// <param name="total"></param>
public void CollectPercentiles(IDictionary<string, long[]> total)
foreach (var result in _results)
var arr = total[result.Key];
for (var i = 0; i < arr.Length; i++)
arr[i] += result.Value.Slots[i];
/// <summary>
/// Timeout task to stop execution.
/// </summary>
private class TimeoutTask
/** Benchmark. */
private readonly BenchmarkBase _benchmark;
/** Duration. */
private readonly long _dur;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="benchmark">Benchmark.</param>
/// <param name="dur">Duration.</param>
public TimeoutTask(BenchmarkBase benchmark, long dur)
_benchmark = benchmark;
_dur = dur;
/// <summary>
/// Task routine.
/// </summary>
public void Run()
_benchmark._stop = true;
/// <summary>
/// Warmup task to clear warmup flag.
/// </summary>
private class WarmupTask
/** Benchmark. */
private readonly BenchmarkBase _benchmark;
/** Duration. */
private readonly long _dur;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="benchmark">Benchmark.</param>
/// <param name="dur">Duration.</param>
public WarmupTask(BenchmarkBase benchmark, long dur)
_benchmark = benchmark;
_dur = dur;
/// <summary>
/// Task routine.
/// </summary>
public void Run()
_benchmark._warmup = false;
/// <summary>
/// Throughput write task.
/// </summary>
private class ThroughputTask
/** Benchmark. */
private readonly BenchmarkBase _benchmark;
/** Last recorded result. */
private IDictionary<string, Tuple<long, long>> _lastResults;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="benchmark">Benchmark.</param>
public ThroughputTask(BenchmarkBase benchmark)
_benchmark = benchmark;
public void Run()
while (!_benchmark._stop)
if (_benchmark._start && !_benchmark._warmup)
var results = _benchmark.GetCurrentThroughput();
if (_benchmark._finishedThreads > 0)
return; // Threads are stopping, do not collect any more.
foreach (var pair in results)
Tuple<long, long> old;
if (_lastResults != null && _lastResults.TryGetValue(pair.Key, out old))
_benchmark._writer.WriteThroughput(pair.Key, pair.Value.Item1 - old.Item1,
pair.Value.Item2 - old.Item2);
_benchmark._writer.WriteThroughput(pair.Key, pair.Value.Item1,
_lastResults = results;
/// <summary>
/// Benchmark result. Specific for each operation.
/// </summary>
private class Result
/** Slots. */
public readonly long[] Slots;
/** Slot duration in ticks. */
private readonly long _slotDuration;
/** Total operations count. */
public long OpCount;
/** Total duration. */
public long Duration;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="slotCnt">Slot count.</param>
/// <param name="slotDuration">Slot duration in ticks.</param>
public Result(long slotCnt, long slotDuration)
Slots = new long[slotCnt];
_slotDuration = slotDuration;
/// <summary>
/// Add result.
/// </summary>
/// <param name="curDur">Current duration in ticks.</param>
public void Add(long curDur)
var idx = (int) (curDur/_slotDuration);
if (idx >= Slots.Length)
idx = Slots.Length - 1;
Slots[idx] += 1;
Duration += curDur;