blob: 439ed7379df95472ba00cc61349012311794f17d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.metrics;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.Test;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.EmbeddedCassandraService;
import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot;
import static org.apache.cassandra.cql3.statements.BatchStatement.metrics;
import static org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir.*;
import static org.junit.Assert.assertEquals;
import static org.quicktheories.QuickTheory.qt;
import static org.quicktheories.generators.Generate.intArrays;
import static org.quicktheories.generators.SourceDSL.integers;
public class BatchMetricsTest extends SchemaLoader
{
private static final int MAX_ROUNDS_TO_PERFORM = 3;
private static final int MAX_DISTINCT_PARTITIONS = 128;
private static final int MAX_STATEMENTS_PER_ROUND = 32;
private static EmbeddedCassandraService cassandra;
private static Cluster cluster;
private static Session session;
private static String KEYSPACE = "junit";
private static final String LOGGER_TABLE = "loggerbatchmetricstest";
private static final String COUNTER_TABLE = "counterbatchmetricstest";
private static PreparedStatement psLogger;
private static PreparedStatement psCounter;
@BeforeClass()
public static void setup() throws ConfigurationException, IOException
{
Schema.instance.clear();
cassandra = new EmbeddedCassandraService();
cassandra.start();
DatabaseDescriptor.setWriteRpcTimeout(TimeUnit.SECONDS.toMillis(10));
cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
session = cluster.connect();
session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
session.execute("USE " + KEYSPACE);
session.execute("CREATE TABLE IF NOT EXISTS " + LOGGER_TABLE + " (id int PRIMARY KEY, val text);");
session.execute("CREATE TABLE IF NOT EXISTS " + COUNTER_TABLE + " (id int PRIMARY KEY, val counter);");
psLogger = session.prepare("INSERT INTO " + KEYSPACE + '.' + LOGGER_TABLE + " (id, val) VALUES (?, ?);");
psCounter = session.prepare("UPDATE " + KEYSPACE + '.' + COUNTER_TABLE + " SET val = val + 1 WHERE id = ?;");
}
private void executeLoggerBatch(BatchStatement.Type batchStatementType, int distinctPartitions, int statementsPerPartition)
{
BatchStatement batch = new BatchStatement(batchStatementType);
for (int i = 0; i < distinctPartitions; i++)
{
for (int j = 0; j < statementsPerPartition; j++)
{
if (batchStatementType == BatchStatement.Type.UNLOGGED || batchStatementType == BatchStatement.Type.LOGGED)
batch.add(psLogger.bind(i, "aaaaaaaa"));
else if (batchStatementType == BatchStatement.Type.COUNTER)
batch.add(psCounter.bind(i));
else
throw new IllegalStateException("There is no a case for BatchStatement.Type." + batchStatementType.name());
}
}
session.execute(batch);
}
@Test
public void testLoggedPartitionsPerBatch()
{
qt().withExamples(25)
.forAll(intArrays(integers().between(1, MAX_ROUNDS_TO_PERFORM),
integers().between(1, MAX_STATEMENTS_PER_ROUND)),
integers().between(1, MAX_DISTINCT_PARTITIONS))
.checkAssert((rounds, distinctPartitions) ->
assertMetrics(BatchStatement.Type.LOGGED, rounds, distinctPartitions));
}
@Test
public void testUnloggedPartitionsPerBatch()
{
qt().withExamples(25)
.forAll(intArrays(integers().between(1, MAX_ROUNDS_TO_PERFORM),
integers().between(1, MAX_STATEMENTS_PER_ROUND)),
integers().between(1, MAX_DISTINCT_PARTITIONS))
.checkAssert((rounds, distinctPartitions) ->
assertMetrics(BatchStatement.Type.UNLOGGED, rounds, distinctPartitions));
}
@Test
public void testCounterPartitionsPerBatch()
{
qt().withExamples(10)
.forAll(intArrays(integers().between(1, MAX_ROUNDS_TO_PERFORM),
integers().between(1, MAX_STATEMENTS_PER_ROUND)),
integers().between(1, MAX_DISTINCT_PARTITIONS))
.checkAssert((rounds, distinctPartitions) ->
assertMetrics(BatchStatement.Type.COUNTER, rounds, distinctPartitions));
}
private void assertMetrics(BatchStatement.Type batchTypeTested, int[] rounds, int distinctPartitions)
{
// reset the histogram between runs
clearHistogram();
// roundsOfStatementsPerPartition - array length is the number of rounds to executeLoggerBatch() and each
// value in the array represents the number of statements to execute per partition on that round
for (int ix = 0; ix < rounds.length; ix++)
{
long partitionsPerLoggedBatchCountPre = metrics.partitionsPerLoggedBatch.getCount();
long expectedPartitionsPerLoggedBatchCount = partitionsPerLoggedBatchCountPre + (batchTypeTested == BatchStatement.Type.LOGGED ? 1 : 0);
long partitionsPerUnloggedBatchCountPre = metrics.partitionsPerUnloggedBatch.getCount();
long expectedPartitionsPerUnloggedBatchCount = partitionsPerUnloggedBatchCountPre + (batchTypeTested == BatchStatement.Type.UNLOGGED ? 1 : 0);
long partitionsPerCounterBatchCountPre = metrics.partitionsPerCounterBatch.getCount();
long expectedPartitionsPerCounterBatchCount = partitionsPerCounterBatchCountPre + (batchTypeTested == BatchStatement.Type.COUNTER ? 1 : 0);
executeLoggerBatch(batchTypeTested, distinctPartitions, rounds[ix]);
assertEquals(expectedPartitionsPerUnloggedBatchCount, metrics.partitionsPerUnloggedBatch.getCount());
assertEquals(expectedPartitionsPerLoggedBatchCount, metrics.partitionsPerLoggedBatch.getCount());
assertEquals(expectedPartitionsPerCounterBatchCount, metrics.partitionsPerCounterBatch.getCount());
EstimatedHistogramReservoirSnapshot partitionsPerLoggedBatchSnapshot = (EstimatedHistogramReservoirSnapshot) metrics.partitionsPerLoggedBatch.getSnapshot();
EstimatedHistogramReservoirSnapshot partitionsPerUnloggedBatchSnapshot = (EstimatedHistogramReservoirSnapshot) metrics.partitionsPerUnloggedBatch.getSnapshot();
EstimatedHistogramReservoirSnapshot partitionsPerCounterBatchSnapshot = (EstimatedHistogramReservoirSnapshot) metrics.partitionsPerCounterBatch.getSnapshot();
// BatchMetrics uses DecayingEstimatedHistogramReservoir which notes that the return of getMax()
// may be more than the actual max value recorded in the reservoir with similar but reverse properties
// for getMin(). uses getBucketingForValue() on the snapshot to identify the exact max. since the
// distinctPartitions doesn't change per test round these values shouldn't change.
Range expectedPartitionsPerLoggedBatchMinMax = batchTypeTested == BatchStatement.Type.LOGGED ?
determineExpectedMinMax(partitionsPerLoggedBatchSnapshot, distinctPartitions) :
new Range(0L, 0L);
Range expectedPartitionsPerUnloggedBatchMinMax = batchTypeTested == BatchStatement.Type.UNLOGGED ?
determineExpectedMinMax(partitionsPerUnloggedBatchSnapshot, distinctPartitions) :
new Range(0L, 0L);
Range expectedPartitionsPerCounterBatchMinMax = batchTypeTested == BatchStatement.Type.COUNTER ?
determineExpectedMinMax(partitionsPerCounterBatchSnapshot, distinctPartitions) :
new Range(0L, 0L);
assertEquals(expectedPartitionsPerLoggedBatchMinMax, new Range(partitionsPerLoggedBatchSnapshot.getMin(), partitionsPerLoggedBatchSnapshot.getMax()));
assertEquals(expectedPartitionsPerUnloggedBatchMinMax, new Range(partitionsPerUnloggedBatchSnapshot.getMin(), partitionsPerUnloggedBatchSnapshot.getMax()));
assertEquals(expectedPartitionsPerCounterBatchMinMax, new Range(partitionsPerCounterBatchSnapshot.getMin(), partitionsPerCounterBatchSnapshot.getMax()));
}
}
private void clearHistogram()
{
((ClearableHistogram) metrics.partitionsPerLoggedBatch).clear();
((ClearableHistogram) metrics.partitionsPerUnloggedBatch).clear();
((ClearableHistogram) metrics.partitionsPerCounterBatch).clear();
}
private Range determineExpectedMinMax(EstimatedHistogramReservoirSnapshot snapshot, long value)
{
return snapshot.getBucketingRangeForValue(value);
}
}