blob: 657117c5e63cb56b815768648f9b9cebae3cdeda [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.stress;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.stress.operations.OpDistribution;
import org.apache.cassandra.stress.operations.OpDistributionFactory;
import org.apache.cassandra.stress.settings.SettingsCommand;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.TimingInterval;
import org.apache.cassandra.transport.SimpleClient;
public class StressAction implements Runnable
{
private final StressSettings settings;
private final PrintStream output;
public StressAction(StressSettings settings, PrintStream out)
{
this.settings = settings;
output = out;
}
public void run()
{
// creating keyspace and column families
settings.maybeCreateKeyspaces();
if (settings.command.count == 0)
{
output.println("N=0: SCHEMA CREATED, NOTHING ELSE DONE.");
settings.disconnect();
return;
}
output.println("Sleeping 2s...");
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
if (!settings.command.noWarmup)
warmup(settings.command.getFactory(settings));
if (settings.command.truncate == SettingsCommand.TruncateWhen.ONCE)
settings.command.truncateTables(settings);
// TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution
RateLimiter rateLimiter = null;
if (settings.rate.opRateTargetPerSecond > 0)
rateLimiter = RateLimiter.create(settings.rate.opRateTargetPerSecond);
boolean success;
if (settings.rate.minThreads > 0)
success = runMulti(settings.rate.auto, rateLimiter);
else
success = null != run(settings.command.getFactory(settings), settings.rate.threadCount, settings.command.count,
settings.command.duration, rateLimiter, settings.command.durationUnits, output, false);
if (success)
output.println("END");
else
output.println("FAILURE");
settings.disconnect();
}
// type provided separately to support recursive call for mixed command with each command type it is performing
private void warmup(OpDistributionFactory operations)
{
PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
// do 25% of iterations as warmup but no more than 50k (by default hotspot compiles methods after 10k invocations)
int iterations = (settings.command.count >= 0
? Math.min(50000, (int)(settings.command.count * 0.25))
: 50000) * settings.node.nodes.size();
if (iterations <= 0) return;
int threads = 100;
if (settings.rate.maxThreads > 0)
threads = Math.min(threads, settings.rate.maxThreads);
if (settings.rate.threadCount > 0)
threads = Math.min(threads, settings.rate.threadCount);
for (OpDistributionFactory single : operations.each())
{
// we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
// so warm up all the nodes we're speaking to only.
output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations));
run(single, threads, iterations, 0, null, null, warmupOutput, true);
}
}
// TODO : permit varying more than just thread count
// TODO : vary thread count based on percentage improvement of previous increment, not by fixed amounts
private boolean runMulti(boolean auto, RateLimiter rateLimiter)
{
if (settings.command.targetUncertainty >= 0)
output.println("WARNING: uncertainty mode (err<) results in uneven workload between thread runs, so should be used for high level analysis only");
int prevThreadCount = -1;
int threadCount = settings.rate.minThreads;
List<StressMetrics> results = new ArrayList<>();
List<String> runIds = new ArrayList<>();
do
{
output.println(String.format("Running with %d threadCount", threadCount));
if (settings.command.truncate == SettingsCommand.TruncateWhen.ALWAYS)
settings.command.truncateTables(settings);
StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count,
settings.command.duration, rateLimiter, settings.command.durationUnits, output, false);
if (result == null)
return false;
results.add(result);
if (prevThreadCount > 0)
System.out.println(String.format("Improvement over %d threadCount: %.0f%%",
prevThreadCount, 100 * averageImprovement(results, 1)));
runIds.add(threadCount + " threadCount");
prevThreadCount = threadCount;
if (threadCount < 16)
threadCount *= 2;
else
threadCount *= 1.5;
if (!results.isEmpty() && threadCount > settings.rate.maxThreads)
break;
if (settings.command.type.updates)
{
// pause an arbitrary period of time to let the commit log flush, etc. shouldn't make much difference
// as we only increase load, never decrease it
output.println("Sleeping for 15s");
try
{
Thread.sleep(15 * 1000);
} catch (InterruptedException e)
{
return false;
}
}
// run until we have not improved throughput significantly for previous three runs
} while (!auto || (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results, 5, settings.command.targetUncertainty)));
// summarise all results
StressMetrics.summarise(runIds, results, output, settings.samples.historyCount);
return true;
}
private boolean hasAverageImprovement(List<StressMetrics> results, int count, double minImprovement)
{
return results.size() < count + 1 || averageImprovement(results, count) >= minImprovement;
}
private double averageImprovement(List<StressMetrics> results, int count)
{
double improvement = 0;
for (int i = results.size() - count ; i < results.size() ; i++)
{
double prev = results.get(i - 1).getTiming().getHistory().opRate();
double cur = results.get(i).getTiming().getHistory().opRate();
improvement += (cur - prev) / prev;
}
return improvement / count;
}
private StressMetrics run(OpDistributionFactory operations,
int threadCount,
long opCount,
long duration,
RateLimiter rateLimiter,
TimeUnit durationUnits,
PrintStream output,
boolean isWarmup)
{
output.println(String.format("Running %s with %d threads %s",
operations.desc(),
threadCount,
durationUnits != null ? duration + " " + durationUnits.toString().toLowerCase()
: opCount > 0 ? "for " + opCount + " iteration"
: "until stderr of mean < " + settings.command.targetUncertainty));
final WorkManager workManager;
if (opCount < 0)
workManager = new WorkManager.ContinuousWorkManager();
else
workManager = new WorkManager.FixedWorkManager(opCount);
final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis, settings);
final CountDownLatch done = new CountDownLatch(threadCount);
final Consumer[] consumers = new Consumer[threadCount];
int sampleCount = settings.samples.liveCount / threadCount;
for (int i = 0; i < threadCount; i++)
{
consumers[i] = new Consumer(operations.get(metrics.getTiming(), sampleCount, isWarmup),
done, workManager, metrics, rateLimiter);
}
// starting worker threadCount
for (int i = 0; i < threadCount; i++)
consumers[i].start();
metrics.start();
if (durationUnits != null)
{
Uninterruptibles.sleepUninterruptibly(duration, durationUnits);
workManager.stop();
}
else if (opCount <= 0)
{
try
{
metrics.waitUntilConverges(settings.command.targetUncertainty,
settings.command.minimumUncertaintyMeasurements,
settings.command.maximumUncertaintyMeasurements);
} catch (InterruptedException e) { }
workManager.stop();
}
try
{
done.await();
metrics.stop();
}
catch (InterruptedException e) {}
if (metrics.wasCancelled())
return null;
metrics.summarise();
boolean success = true;
for (Consumer consumer : consumers)
success &= consumer.success;
if (!success)
return null;
return metrics;
}
private class Consumer extends Thread
{
private final OpDistribution operations;
private final StressMetrics metrics;
private final RateLimiter rateLimiter;
private volatile boolean success = true;
private final WorkManager workManager;
private final CountDownLatch done;
public Consumer(OpDistribution operations,
CountDownLatch done,
WorkManager workManager,
StressMetrics metrics,
RateLimiter rateLimiter)
{
this.done = done;
this.rateLimiter = rateLimiter;
this.workManager = workManager;
this.metrics = metrics;
this.operations = operations;
}
public void run()
{
operations.initTimers();
try
{
SimpleClient sclient = null;
ThriftClient tclient = null;
JavaDriverClient jclient = null;
switch (settings.mode.api)
{
case JAVA_DRIVER_NATIVE:
jclient = settings.getJavaDriverClient();
break;
case SIMPLE_NATIVE:
sclient = settings.getSimpleNativeClient();
break;
case THRIFT:
case THRIFT_SMART:
tclient = settings.getThriftClient();
break;
default:
throw new IllegalStateException();
}
while (true)
{
Operation op = operations.next();
if (!op.ready(workManager, rateLimiter))
break;
try
{
switch (settings.mode.api)
{
case JAVA_DRIVER_NATIVE:
op.run(jclient);
break;
case SIMPLE_NATIVE:
op.run(sclient);
break;
case THRIFT:
case THRIFT_SMART:
default:
op.run(tclient);
}
}
catch (Exception e)
{
if (output == null)
{
System.err.println(e.getMessage());
success = false;
System.exit(-1);
}
e.printStackTrace(output);
success = false;
workManager.stop();
metrics.cancel();
return;
}
}
}
finally
{
done.countDown();
operations.closeTimers();
}
}
}
}