blob: 4398b3d6e26cca8b0f4bc29ef82282e09a303765 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.monitoring.runtime.instrumentation.AllocationRecorder;
import com.google.monitoring.runtime.instrumentation.Sampler;
import com.sun.management.ThreadMXBean;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.ReadQuery;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
public class CompactionAllocationTest
{
private static final Logger logger = LoggerFactory.getLogger(CompactionAllocationTest.class);
private static final ThreadMXBean threadMX = (ThreadMXBean) ManagementFactory.getThreadMXBean();
private static final boolean AGENT_MEASUREMENT = true;
private static final boolean PROFILING_READS = false;
private static final boolean PROFILING_COMPACTION = false;
private static final boolean PROFILING = PROFILING_READS || PROFILING_COMPACTION;
private static final List<String> summaries = new ArrayList<>();
private static class CompactionSummary
{
final Measurement measurement;
final int numPartitions;
final int numRows;
public CompactionSummary(Measurement measurement, int numPartitions, int numRows)
{
this.measurement = measurement;
this.numPartitions = numPartitions;
this.numRows = numRows;
}
List<String> cells()
{
long b = measurement.bytes();
return Lists.newArrayList(Long.toString(b), Long.toString(b/numPartitions), Long.toString(b/numRows));
}
static final List<String> HEADERS = Lists.newArrayList("bytes", "/p", "/r");
static final List<String> EMPTY = Lists.newArrayList("n/a", "n/a", "n/a");
}
private static class ReadSummary
{
final Measurement measurement;
final int numReads;
public ReadSummary(Measurement measurement, int numReads)
{
this.measurement = measurement;
this.numReads = numReads;
}
List<String> cells()
{
long b = measurement.bytes();
return Lists.newArrayList(Long.toString(b), Long.toString(b/numReads));
}
static final List<String> HEADERS = Lists.newArrayList("bytes", "/rd");
static final List<String> EMPTY = Lists.newArrayList("n/a", "n/a");
}
private static final Map<String, CompactionSummary> compactionSummaries = new HashMap<>();
private static final Map<String, ReadSummary> readSummaries = new HashMap<>();
/*
add to jvm args:
-javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar
*/
private static final long MIN_OBJECTS_ALLOCATED;
private static final long MIN_BYTES_ALLOCATED;
static
{
if (AGENT_MEASUREMENT)
{
AgentMeasurement measurement = new AgentMeasurement();
measurement.start();
measurement.stop();
MIN_OBJECTS_ALLOCATED = measurement.objectsAllocated;
MIN_BYTES_ALLOCATED = measurement.bytesAllocated;
}
else
{
MIN_OBJECTS_ALLOCATED = 0;
MIN_BYTES_ALLOCATED = 0;
logger.warn("{} is using the ThreadMXBean to measure memory usage, this is less accurate than the allocation instrumenter agent", CompactionAllocationTest.class.getSimpleName());
logger.warn("If you're running this in your IDE, add the following jvm arg: " +
"-javaagent:<build.dir>/lib/jars/java-allocation-instrumenter-<allocation-instrumenter.version>.jar " +
"(and replace <> with appropriate values from build.xml)");
}
}
@BeforeClass
public static void setupClass() throws Throwable
{
SchemaLoader.prepareServer();
SchemaLoader.startGossiper();
testTinyPartitions("warmup", 9, maybeInflate(300), true);
}
@AfterClass
public static void afterClass()
{
logger.info("SUMMARIES:");
for (String summary : summaries)
logger.info(summary);
List<List<String>> groups = new ArrayList<>();
groups.add(Lists.newArrayList("tinyNonOverlapping3",
"tinyNonOverlapping9",
"tinyOverlapping3",
"tinyOverlapping9"));
groups.add(Lists.newArrayList("mediumNonOverlappingPartitions3",
"mediumNonOverlappingPartitions9",
"mediumOverlappingPartitions3",
"mediumOverlappingPartitions9",
"mediumPartitionsOverlappingRows3",
"mediumPartitionsOverlappingRows9"));
groups.add(Lists.newArrayList("wideNonOverlappingPartitions3",
"wideNonOverlappingPartitions9",
"wideOverlappingPartitions3",
"wideOverlappingPartitions9",
"widePartitionsOverlappingRows9",
"widePartitionsOverlappingRows3"));
Map<String, List<String>> fullRows = new HashMap<>();
for (String workload : Iterables.concat(groups))
{
CompactionSummary cs = compactionSummaries.get(workload);
ReadSummary rs = readSummaries.get(workload);
fullRows.put(workload, Lists.newArrayList(Iterables.concat(cs != null ? cs.cells() : CompactionSummary.EMPTY,
rs != null ? rs.cells() : ReadSummary.EMPTY)));
}
logger.info("");
logger.info("TAB DELIMITED:");
String header = Joiner.on('\t').join(Iterables.concat(CompactionSummary.HEADERS, ReadSummary.HEADERS));
for (List<String> group: groups)
{
logger.info(Joiner.on('\t').join(group));
logger.info(header);
logger.info(Joiner.on('\t').join(Iterables.concat(Iterables.transform(group, g -> fullRows.getOrDefault(g, Collections.emptyList())))));
}
}
private static int maybeInflate(int base, int inflate)
{
return PROFILING ? base * inflate : base;
}
private static int maybeInflate(int base)
{
return maybeInflate(base, 3);
}
private interface Workload
{
void setup();
ColumnFamilyStore getCfs();
String name();
List<Runnable> getReads();
}
private static Measurement createMeasurement()
{
return AGENT_MEASUREMENT ? new AgentMeasurement() : new MXMeasurement();
}
private interface Measurement
{
void start();
void stop();
long cpu();
long bytes();
long objects();
default String prettyBytes()
{
return FBUtilities.prettyPrintMemory(bytes());
}
}
public static class AgentMeasurement implements Measurement, Sampler
{
long objectsAllocated = 0;
long bytesAllocated = 0;
private final long threadID = Thread.currentThread().getId();
public void sampleAllocation(int count, String desc, Object newObj, long bytes)
{
if (Thread.currentThread().getId() != threadID)
return;
objectsAllocated++;
bytesAllocated += bytes;
}
public void start()
{
AllocationRecorder.addSampler(this);
}
public void stop()
{
AllocationRecorder.removeSampler(this);
if (bytesAllocated == 0)
logger.warn("no allocations recorded, make sure junit is run with -javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar");
}
public long cpu()
{
return 0;
}
public long objects()
{
return objectsAllocated - MIN_OBJECTS_ALLOCATED;
}
public long bytes()
{
return bytesAllocated - MIN_BYTES_ALLOCATED;
}
}
public static class MXMeasurement implements Measurement
{
private final Thread thread = Thread.currentThread();
private class Point
{
long bytes;
long cpu;
void capture()
{
bytes = threadMX.getThreadAllocatedBytes(thread.getId());
cpu = threadMX.getThreadCpuTime(thread.getId());
}
}
private final Point start = new Point();
private final Point stop = new Point();
public void start()
{
start.capture();
}
public void stop()
{
stop.capture();
}
public long cpu()
{
return stop.cpu - start.cpu;
}
public long bytes()
{
return stop.bytes - start.bytes;
}
public long objects()
{
return 0;
}
}
@Test
public void allocMeasuring()
{
long size = ObjectSizes.measure(5);
int numAlloc = 1000;
Measurement measurement = createMeasurement();
measurement.start();
for (int i=0; i<numAlloc; i++)
new Integer(i);
measurement.stop();
logger.info(" ** {}", measurement.prettyBytes());
logger.info(" ** expected {}", size * numAlloc);
}
private static void measure(Workload workload) throws Throwable
{
workload.setup();
Measurement readSampler = createMeasurement();
Measurement compactionSampler = createMeasurement();
String readSummary = "SKIPPED";
if (!PROFILING_COMPACTION)
{
List<Runnable> reads = workload.getReads();
readSampler.start();
if (PROFILING_READS && !workload.name().equals("warmup"))
{
logger.info(">>> Start profiling");
Thread.sleep(10000);
}
for (int i=0; i<reads.size(); i++)
reads.get(i).run();
Thread.sleep(1000);
if (PROFILING_READS && !workload.name().equals("warmup"))
{
logger.info(">>> Stop profiling");
Thread.sleep(10000);
}
readSampler.stop();
readSummary = String.format("%s bytes, %s /read, %s cpu", readSampler.bytes(), readSampler.bytes()/reads.size(), readSampler.cpu());
readSummaries.put(workload.name(), new ReadSummary(readSampler, reads.size()));
}
ColumnFamilyStore cfs = workload.getCfs();
ActiveCompactions active = new ActiveCompactions();
Set<SSTableReader> sstables = cfs.getLiveSSTables();
CompactionTasks tasks = cfs.getCompactionStrategyManager()
.getUserDefinedTasks(sstables, FBUtilities.nowInSeconds());
Assert.assertFalse(tasks.isEmpty());
String compactionSummary = "SKIPPED";
if (!PROFILING_READS)
{
compactionSampler.start();
if (PROFILING_COMPACTION && !workload.name().equals("warmup"))
{
logger.info(">>> Start profiling");
Thread.sleep(10000);
}
for (AbstractCompactionTask task : tasks)
task.execute(active);
Thread.sleep(1000);
if (PROFILING_COMPACTION && !workload.name().equals("warmup"))
{
logger.info(">>> Stop profiling");
Thread.sleep(10000);
}
compactionSampler.stop();
Assert.assertEquals(1, cfs.getLiveSSTables().size());
int numPartitions = Ints.checkedCast(Iterables.getOnlyElement(cfs.getLiveSSTables()).getSSTableMetadata().estimatedPartitionSize.count());
int numRows = Ints.checkedCast(Iterables.getOnlyElement(cfs.getLiveSSTables()).getSSTableMetadata().totalRows);
compactionSummary = String.format("%s bytes, %s /partition, %s /row, %s cpu", compactionSampler.bytes(), compactionSampler.bytes()/numPartitions, compactionSampler.bytes()/numRows, compactionSampler.cpu());
compactionSummaries.put(workload.name(), new CompactionSummary(compactionSampler, numPartitions, numRows));
}
cfs.truncateBlocking();
logger.info("***");
logger.info("*** {} reads summary", workload.name());
logger.info(readSummary);
logger.info("*** {} compaction summary", workload.name());
logger.info(compactionSummary);
if (!workload.name().equals("warmup"))
{
summaries.add(workload.name() + " reads summary: " + readSummary);
summaries.add(workload.name() + " compaction summary: " + compactionSummary);
}
Thread.sleep(1000); // avoid losing report when running in IDE
}
private static final DataOutputPlus NOOP_OUT = new UnbufferedDataOutputStreamPlus()
{
public void write(byte[] buffer, int offset, int count) throws IOException {}
public void write(int oneByte) throws IOException {}
};
private static void runQuery(ReadQuery query, TableMetadata metadata)
{
try (ReadExecutionController executionController = query.executionController();
UnfilteredPartitionIterator partitions = query.executeLocally(executionController))
{
UnfilteredPartitionIterators.serializerForIntraNode().serialize(partitions, ColumnFilter.all(metadata), NOOP_OUT, MessagingService.current_version);
}
catch (IOException e)
{
throw new AssertionError(e);
}
}
private static void testTinyPartitions(String name, int numSSTable, int sstablePartitions, boolean overlap) throws Throwable
{
String ksname = "ks_" + name.toLowerCase();
SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1),
CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", ksname).build());
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id);
Assert.assertNotNull(cfs);
cfs.disableAutoCompaction();
List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions));
measure(new Workload()
{
public void setup()
{
cfs.disableAutoCompaction();
String insert = String.format("INSERT INTO %s.%s (k, v) VALUES (?,?)", ksname, "tbl");
String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl");
SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls());
QueryState queryState = QueryState.forInternalCalls();
for (int f=0; f<numSSTable; f++)
{
for (int p = 0; p < sstablePartitions; p++)
{
int key = overlap ? p : (f * sstablePartitions) + p;
QueryProcessor.executeInternal(insert, key, key);
if (!overlap || f == 0)
{
QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{f});
ReadQuery query = select.getQuery(options, queryState.getNowInSeconds());
reads.add(() -> runQuery(query, cfs.metadata.get()));
}
}
cfs.forceBlockingFlush();
}
Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size());
}
public List<Runnable> getReads()
{
return reads;
}
public ColumnFamilyStore getCfs()
{
return cfs;
}
public String name()
{
return name;
}
});
}
@Test
public void tinyNonOverlapping3() throws Throwable
{
testTinyPartitions("tinyNonOverlapping3", 3, maybeInflate(900, 6), false);
}
@Test
public void tinyNonOverlapping9() throws Throwable
{
testTinyPartitions("tinyNonOverlapping9", 9, maybeInflate(300, 6), false);
}
@Test
public void tinyOverlapping3() throws Throwable
{
testTinyPartitions("tinyOverlapping3", 3, maybeInflate(900, 6), true);
}
@Test
public void tinyOverlapping9() throws Throwable
{
testTinyPartitions("tinyOverlapping9", 9, maybeInflate(300, 6), true);
}
private static final Random globalRandom = new Random();
private static final Random localRandom = new Random();
public static String makeRandomString(int length)
{
return makeRandomString(length, -1);
}
public static String makeRandomString(int length, int seed)
{
Random r;
if (seed < 0)
{
r = globalRandom;
}
else
{
r = localRandom;
r.setSeed(seed);
}
char[] chars = new char[length];
for (int i = 0; i < length; ++i)
chars[i] = (char) ('a' + r.nextInt('z' - 'a' + 1));
return new String(chars);
}
private static void testMediumPartitions(String name, int numSSTable, int sstablePartitions, boolean overlap, boolean overlapCK) throws Throwable
{
String ksname = "ks_" + name.toLowerCase();
SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1),
CreateTableStatement.parse("CREATE TABLE tbl (k text, c text, v1 text, v2 text, v3 text, v4 text, PRIMARY KEY (k, c))", ksname).build());
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id);
Assert.assertNotNull(cfs);
cfs.disableAutoCompaction();
int rowsPerPartition = 200;
List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions));
measure(new Workload()
{
public void setup()
{
cfs.disableAutoCompaction();
String insert = String.format("INSERT INTO %s.%s (k, c, v1, v2, v3, v4) VALUES (?, ?, ?, ?, ?, ?)", ksname, "tbl");
String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl");
SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls());
QueryState queryState = QueryState.forInternalCalls();
for (int f=0; f<numSSTable; f++)
{
for (int p = 0; p < sstablePartitions; p++)
{
String key = String.format("%08d", overlap ? p : (f * sstablePartitions) + p);
for (int r=0; r<rowsPerPartition; r++)
{
QueryProcessor.executeInternal(insert, key, makeRandomString(6, overlapCK ? r : -1),
makeRandomString(8), makeRandomString(8),
makeRandomString(8), makeRandomString(8));
}
if (!overlap || f == 0)
{
QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{key});
ReadQuery query = select.getQuery(options, queryState.getNowInSeconds());
reads.add(() -> runQuery(query, cfs.metadata.get()));
}
}
cfs.forceBlockingFlush();
}
Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size());
}
public ColumnFamilyStore getCfs()
{
return cfs;
}
public List<Runnable> getReads()
{
return reads;
}
public String name()
{
return name;
}
});
}
@Test
public void mediumNonOverlappingPartitions3() throws Throwable
{
testMediumPartitions("mediumNonOverlappingPartitions3", 3, maybeInflate(60), false, false);
}
@Test
public void mediumNonOverlappingPartitions9() throws Throwable
{
testMediumPartitions("mediumNonOverlappingPartitions9", 9, maybeInflate(20), false, false);
}
@Test
public void mediumOverlappingPartitions3() throws Throwable
{
testMediumPartitions("mediumOverlappingPartitions3", 3, maybeInflate(60), true, false);
}
@Test
public void mediumOverlappingPartitions9() throws Throwable
{
testMediumPartitions("mediumOverlappingPartitions9", 9, maybeInflate(20), true, false);
}
@Test
public void mediumPartitionsOverlappingRows3() throws Throwable
{
testMediumPartitions("mediumPartitionsOverlappingRows3", 3, maybeInflate(60), true, true);
}
@Test
public void mediumPartitionsOverlappingRows9() throws Throwable
{
testMediumPartitions("mediumPartitionsOverlappingRows9", 9, maybeInflate(20), true, true);
}
private static void testWidePartitions(String name, int numSSTable, int sstablePartitions, boolean overlap, boolean overlapCK) throws Throwable
{
String ksname = "ks_" + name.toLowerCase();
SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1),
CreateTableStatement.parse("CREATE TABLE tbl (k text, c text, v1 text, v2 text, v3 text, v4 text, PRIMARY KEY (k, c))", ksname).build());
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id);
Assert.assertNotNull(cfs);
cfs.disableAutoCompaction();
int rowWidth = 100;
int rowsPerPartition = 1000;
List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions));
measure(new Workload()
{
public void setup()
{
cfs.disableAutoCompaction();
String insert = String.format("INSERT INTO %s.%s (k, c, v1, v2, v3, v4) VALUES (?, ?, ?, ?, ?, ?)", ksname, "tbl");
String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl");
SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls());
QueryState queryState = QueryState.forInternalCalls();
for (int f=0; f<numSSTable; f++)
{
for (int p = 0; p < sstablePartitions; p++)
{
String key = String.format("%08d", overlap ? p : (f * sstablePartitions) + p);
for (int r=0; r<rowsPerPartition; r++)
{
QueryProcessor.executeInternal(insert , key, makeRandomString(6, overlapCK ? r : -1),
makeRandomString(rowWidth>>2), makeRandomString(rowWidth>>2),
makeRandomString(rowWidth>>2), makeRandomString(rowWidth>>2));
}
if (!overlap || f == 0)
{
QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{key});
ReadQuery query = select.getQuery(options, queryState.getNowInSeconds());
reads.add(() -> runQuery(query, cfs.metadata.get()));
}
}
cfs.forceBlockingFlush();
}
Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size());
}
public ColumnFamilyStore getCfs()
{
return cfs;
}
public List<Runnable> getReads()
{
return reads;
}
public String name()
{
return name;
}
});
}
@Test
public void wideNonOverlappingPartitions3() throws Throwable
{
testWidePartitions("wideNonOverlappingPartitions3", 3, maybeInflate(24), false, false);
}
@Test
public void wideNonOverlappingPartitions9() throws Throwable
{
testWidePartitions("wideNonOverlappingPartitions9", 9, maybeInflate(8), false, false);
}
@Test
public void wideOverlappingPartitions3() throws Throwable
{
testWidePartitions("wideOverlappingPartitions3", 3, maybeInflate(24), true, false);
}
@Test
public void wideOverlappingPartitions9() throws Throwable
{
testWidePartitions("wideOverlappingPartitions9", 9, maybeInflate(8), true, false);
}
@Test
public void widePartitionsOverlappingRows9() throws Throwable
{
testWidePartitions("widePartitionsOverlappingRows9", 9, maybeInflate(8), true, true);
}
@Test
public void widePartitionsOverlappingRows3() throws Throwable
{
testWidePartitions("widePartitionsOverlappingRows3", 3, maybeInflate(24), true, true);
}
}