blob: 37ebec19b887f8fff604971a28799c120d91886f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.transport;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.datastax.driver.core.*;
import io.netty.buffer.ByteBuf;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.service.NativeTransportService;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.AssertUtil;
import static org.apache.cassandra.config.EncryptionOptions.TlsEncryptionPolicy.UNENCRYPTED;
import static org.apache.cassandra.transport.BurnTestUtil.SizeCaps;
import static org.apache.cassandra.transport.BurnTestUtil.generateQueryMessage;
import static org.apache.cassandra.transport.BurnTestUtil.generateQueryStatement;
import static org.apache.cassandra.transport.BurnTestUtil.generateRows;
import static org.assertj.core.api.Assertions.assertThat;
public class DriverBurnTest extends CQLTester
{
private final CQLConnectionTest.AllocationObserver allocationObserver = new CQLConnectionTest.AllocationObserver();
@Before
public void setup()
{
PipelineConfigurator configurator = new PipelineConfigurator(NativeTransportService.useEpoll(), false, false, UNENCRYPTED)
{
protected ClientResourceLimits.ResourceProvider resourceProvider(ClientResourceLimits.Allocator allocator)
{
return BurnTestUtil.observableResourceProvider(allocationObserver).apply(allocator);
}
};
requireNetwork((builder) -> builder.withPipelineConfigurator(configurator));
}
@Test
public void test() throws Throwable
{
final SizeCaps smallMessageCap = new SizeCaps(10, 20, 5, 10);
final SizeCaps largeMessageCap = new SizeCaps(1000, 2000, 5, 150);
int largeMessageFrequency = 1000;
Message.Type.QUERY.unsafeSetCodec(new Message.Codec<QueryMessage>() {
public QueryMessage decode(ByteBuf body, ProtocolVersion version)
{
QueryMessage queryMessage = QueryMessage.codec.decode(body, version);
return new QueryMessage(queryMessage.query, queryMessage.options) {
protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
{
try
{
int idx = Integer.parseInt(queryMessage.query);
SizeCaps caps = idx % largeMessageFrequency == 0 ? largeMessageCap : smallMessageCap;
return generateRows(idx, caps);
}
catch (NumberFormatException e)
{
// for the requests driver issues under the hood
return super.execute(state, queryStartNanoTime, traceRequest);
}
}
};
}
public void encode(QueryMessage queryMessage, ByteBuf dest, ProtocolVersion version)
{
QueryMessage.codec.encode(queryMessage, dest, version);
}
public int encodedSize(QueryMessage queryMessage, ProtocolVersion version)
{
return 0;
}
});
List<AssertUtil.ThrowingSupplier<Cluster.Builder>> suppliers =
Arrays.asList(() -> Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V4)
.withPort(nativePort),
() -> Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.allowBetaProtocolVersion()
.withPort(nativePort),
() -> Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withCompression(ProtocolOptions.Compression.LZ4)
.allowBetaProtocolVersion()
.withPort(nativePort),
() -> Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withCompression(ProtocolOptions.Compression.LZ4)
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V4)
.withPort(nativePort)
);
int threads = 10;
ExecutorService executor = Executors.newFixedThreadPool(threads);
AtomicReference<Throwable> error = new AtomicReference<>();
CountDownLatch signal = new CountDownLatch(1);
for (int t = 0; t < threads; t++)
{
int threadId = t;
executor.execute(() -> {
try (Cluster driver = suppliers.get(threadId % suppliers.size()).get().build();
Session session = driver.connect())
{
int counter = 0;
while(!Thread.interrupted())
{
Map<Integer, ResultSetFuture> futures = new HashMap<>();
for (int j = 0; j < 10; j++)
{
int descriptor = counter + j * 100 + threadId * 10000;
SizeCaps caps = descriptor % largeMessageFrequency == 0 ? largeMessageCap : smallMessageCap;
futures.put(j, session.executeAsync(generateQueryStatement(descriptor, caps)));
}
for (Map.Entry<Integer, ResultSetFuture> e : futures.entrySet())
{
final int j = e.getKey().intValue();
final int descriptor = counter + j * 100 + threadId * 10000;
SizeCaps caps = descriptor % largeMessageFrequency == 0 ? largeMessageCap : smallMessageCap;
ResultMessage.Rows expectedRS = generateRows(descriptor, caps);
List<Row> actualRS = e.getValue().get().all();
for (int i = 0; i < actualRS.size(); i++)
{
List<ByteBuffer> expected = expectedRS.result.rows.get(i);
Row actual = actualRS.get(i);
for (int col = 0; col < expected.size(); col++)
Assert.assertEquals(actual.getBytes(col), expected.get(col));
}
}
counter++;
}
}
catch (Throwable e)
{
e.printStackTrace();
error.set(e);
signal.countDown();
}
});
}
Assert.assertFalse(signal.await(120, TimeUnit.SECONDS));
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
assertThat(allocationObserver.endpointAllocationTotal()).isEqualTo(allocationObserver.endpointReleaseTotal());
assertThat(allocationObserver.globalAllocationTotal()).isEqualTo(allocationObserver.globalReleaseTotal());
}
@Test
public void measureSmallV6() throws Throwable
{
perfTest(new SizeCaps(10, 20, 5, 10),
new SizeCaps(10, 20, 5, 10),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.allowBetaProtocolVersion()
.withPort(nativePort),
ProtocolVersion.V6);
}
@Test
public void measureSmallV5() throws Throwable
{
perfTest(new SizeCaps(10, 20, 5, 10),
new SizeCaps(10, 20, 5, 10),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V5)
.withPort(nativePort),
ProtocolVersion.V5);
}
@Test
public void measureSmallV4() throws Throwable
{
perfTest(new SizeCaps(10, 20, 5, 10),
new SizeCaps(10, 20, 5, 10),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V4)
.withPort(nativePort),
ProtocolVersion.V4);
}
@Test
public void measureLargeV6() throws Throwable
{
perfTest(new SizeCaps(1000, 2000, 5, 150),
new SizeCaps(1000, 2000, 5, 150),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.allowBetaProtocolVersion()
.withPort(nativePort),
ProtocolVersion.V6);
}
@Test
public void measureLargeV5() throws Throwable
{
perfTest(new SizeCaps(1000, 2000, 5, 150),
new SizeCaps(1000, 2000, 5, 150),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V5)
.withPort(nativePort),
ProtocolVersion.V5);
}
@Test
public void measureLargeV4() throws Throwable
{
perfTest(new SizeCaps(1000, 2000, 5, 150),
new SizeCaps(1000, 2000, 5, 150),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V4)
.withPort(nativePort),
ProtocolVersion.V4);
}
@Test
public void measureSmallV6WithCompression() throws Throwable
{
perfTest(new SizeCaps(10, 20, 5, 10),
new SizeCaps(10, 20, 5, 10),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.allowBetaProtocolVersion()
.withCompression(ProtocolOptions.Compression.LZ4)
.withPort(nativePort),
ProtocolVersion.V6);
}
@Test
public void measureSmallV5WithCompression() throws Throwable
{
perfTest(new SizeCaps(10, 20, 5, 10),
new SizeCaps(10, 20, 5, 10),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V5)
.withCompression(ProtocolOptions.Compression.LZ4)
.withPort(nativePort),
ProtocolVersion.V5);
}
@Test
public void measureSmallV4WithCompression() throws Throwable
{
perfTest(new SizeCaps(10, 20, 5, 10),
new SizeCaps(10, 20, 5, 10),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V4)
.withCompression(ProtocolOptions.Compression.LZ4)
.withPort(nativePort),
ProtocolVersion.V4);
}
@Test
public void measureLargeV6WithCompression() throws Throwable
{
perfTest(new SizeCaps(1000, 2000, 5, 150),
new SizeCaps(1000, 2000, 5, 150),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.allowBetaProtocolVersion()
.withCompression(ProtocolOptions.Compression.LZ4)
.withPort(nativePort),
ProtocolVersion.V6);
}
@Test
public void measureLargeV5WithCompression() throws Throwable
{
perfTest(new SizeCaps(1000, 2000, 5, 150),
new SizeCaps(1000, 2000, 5, 150),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V5)
.withCompression(ProtocolOptions.Compression.LZ4)
.withPort(nativePort),
ProtocolVersion.V5);
}
@Test
public void measureLargeV4WithCompression() throws Throwable
{
perfTest(new SizeCaps(1000, 2000, 5, 150),
new SizeCaps(1000, 2000, 5, 150),
Cluster.builder().addContactPoint(nativeAddr.getHostAddress())
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.V4)
.withCompression(ProtocolOptions.Compression.LZ4)
.withPort(nativePort),
ProtocolVersion.V4);
}
public void perfTest(SizeCaps requestCaps, SizeCaps responseCaps, Cluster.Builder builder, ProtocolVersion version) throws Throwable
{
SimpleStatement request = generateQueryStatement(0, requestCaps);
ResultMessage.Rows response = generateRows(0, responseCaps);
QueryMessage requestMessage = generateQueryMessage(0, requestCaps, version);
Envelope message = requestMessage.encode(version);
int requestSize = message.body.readableBytes();
message.release();
message = response.encode(version);
int responseSize = message.body.readableBytes();
message.release();
Message.Type.QUERY.unsafeSetCodec(new Message.Codec<QueryMessage>() {
public QueryMessage decode(ByteBuf body, ProtocolVersion version)
{
QueryMessage queryMessage = QueryMessage.codec.decode(body, version);
return new QueryMessage(queryMessage.query, queryMessage.options) {
protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
{
try
{
int idx = Integer.parseInt(queryMessage.query); // unused
return generateRows(idx, responseCaps);
}
catch (NumberFormatException e)
{
// for the requests driver issues under the hood
return super.execute(state, queryStartNanoTime, traceRequest);
}
}
};
}
public void encode(QueryMessage queryMessage, ByteBuf dest, ProtocolVersion version)
{
QueryMessage.codec.encode(queryMessage, dest, version);
}
public int encodedSize(QueryMessage queryMessage, ProtocolVersion version)
{
return QueryMessage.codec.encodedSize(queryMessage, version);
}
});
int threads = 10;
int perThread = 30;
ExecutorService executor = Executors.newFixedThreadPool(threads + 10);
AtomicReference<Throwable> error = new AtomicReference<>();
CountDownLatch signal = new CountDownLatch(1);
AtomicBoolean measure = new AtomicBoolean(false);
DescriptiveStatistics stats = new DescriptiveStatistics();
Lock lock = new ReentrantLock();
for (int t = 0; t < threads; t++)
{
executor.execute(() -> {
try (Cluster driver = builder.build();
Session session = driver.connect())
{
while (!executor.isShutdown() && error.get() == null)
{
Map<Integer, ResultSetFuture> futures = new HashMap<>();
for (int j = 0; j < perThread; j++)
{
long startNanos = System.nanoTime();
ResultSetFuture future = session.executeAsync(request);
future.addListener(() -> {
long diff = System.nanoTime() - startNanos;
if (measure.get())
{
lock.lock();
try
{
stats.addValue(TimeUnit.MICROSECONDS.toMillis(diff));
}
finally
{
lock.unlock();
}
}
}, executor);
futures.put(j, future);
}
for (Map.Entry<Integer, ResultSetFuture> e : futures.entrySet())
{
Assert.assertEquals(response.result.size(),
e.getValue().get().all().size());
}
}
}
catch (Throwable e)
{
e.printStackTrace();
error.set(e);
signal.countDown();
}
});
}
Assert.assertFalse(signal.await(30, TimeUnit.SECONDS));
measure.set(true);
Assert.assertFalse(signal.await(60, TimeUnit.SECONDS));
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("requestSize = " + requestSize);
System.out.println("responseSize = " + responseSize);
System.out.println("Mean: " + stats.getMean());
System.out.println("Variance: " + stats.getVariance());
System.out.println("Median: " + stats.getPercentile(0.5));
System.out.println("90p: " + stats.getPercentile(0.90));
System.out.println("95p: " + stats.getPercentile(0.95));
System.out.println("99p: " + stats.getPercentile(0.99));
}
}
// TODO: test disconnecting and reconnecting constantly