blob: 9e84e0ae9dce5c4e537e33a1f22b0c3f3a31f3e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.test.microbench;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamWriter;
import org.apache.cassandra.db.streaming.CassandraStreamHeader;
import org.apache.cassandra.db.streaming.CassandraStreamReader;
import org.apache.cassandra.db.streaming.CassandraStreamWriter;
import org.apache.cassandra.db.streaming.ComponentContext;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.AsyncStreamingInputPlus;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
import org.apache.cassandra.schema.CachingParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.streaming.DefaultConnectionFactory;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamCoordinator;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamSummary;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
/**
* Please ensure that this benchmark is run with stream_throughput_outbound_megabits_per_sec set to a
* really high value otherwise, throttling will kick in and the results will not be meaningful.
*/
@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@Threads(1)
public class ZeroCopyStreamingBenchmark
{
static final int STREAM_SIZE = 50 * 1024 * 1024;
@State(Scope.Thread)
public static class BenchmarkState
{
public static final String KEYSPACE = "ZeroCopyStreamingBenchmark";
public static final String CF_STANDARD = "Standard1";
public static final String CF_INDEXED = "Indexed1";
public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
private static SSTableReader sstable;
private static ColumnFamilyStore store;
private StreamSession session;
private CassandraEntireSSTableStreamWriter blockStreamWriter;
private ComponentContext context;
private ByteBuf serializedBlockStream;
private InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
private CassandraEntireSSTableStreamReader blockStreamReader;
private CassandraStreamWriter partialStreamWriter;
private CassandraStreamReader partialStreamReader;
private ByteBuf serializedPartialStream;
@Setup
public void setupBenchmark() throws IOException
{
Keyspace keyspace = setupSchemaAndKeySpace();
store = keyspace.getColumnFamilyStore("Standard1");
generateData();
sstable = store.getLiveSSTables().iterator().next();
session = setupStreamingSessionForTest();
context = ComponentContext.create(sstable.descriptor);
blockStreamWriter = new CassandraEntireSSTableStreamWriter(sstable, session, context);
CapturingNettyChannel blockStreamCaptureChannel = new CapturingNettyChannel(STREAM_SIZE);
AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(blockStreamCaptureChannel);
blockStreamWriter.write(out);
serializedBlockStream = blockStreamCaptureChannel.getSerializedStream();
out.close();
session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, serializedBlockStream.readableBytes()));
CassandraStreamHeader entireSSTableStreamHeader =
CassandraStreamHeader.builder()
.withSSTableFormat(sstable.descriptor.formatType)
.withSSTableVersion(sstable.descriptor.version)
.withSSTableLevel(0)
.withEstimatedKeys(sstable.estimatedKeys())
.withSections(Collections.emptyList())
.withSerializationHeader(sstable.header.toComponent())
.withComponentManifest(context.manifest())
.isEntireSSTable(true)
.withFirstKey(sstable.first)
.withTableId(sstable.metadata().id)
.build();
blockStreamReader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id,
peer, session.planId(), false,
0, 0, 0,
null), entireSSTableStreamHeader, session);
List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(sstable.first.minValue().getToken(), sstable.last.getToken()));
CassandraStreamHeader partialSSTableStreamHeader =
CassandraStreamHeader.builder()
.withSSTableFormat(sstable.descriptor.formatType)
.withSSTableVersion(sstable.descriptor.version)
.withSSTableLevel(0)
.withEstimatedKeys(sstable.estimatedKeys())
.withSections(sstable.getPositionsForRanges(requestedRanges))
.withSerializationHeader(sstable.header.toComponent())
.withTableId(sstable.metadata().id)
.build();
partialStreamWriter = new CassandraStreamWriter(sstable, partialSSTableStreamHeader, session);
CapturingNettyChannel partialStreamChannel = new CapturingNettyChannel(STREAM_SIZE);
partialStreamWriter.write(new AsyncStreamingOutputPlus(partialStreamChannel));
serializedPartialStream = partialStreamChannel.getSerializedStream();
partialStreamReader = new CassandraStreamReader(new StreamMessageHeader(sstable.metadata().id,
peer, session.planId(), false,
0, 0, 0,
null),
partialSSTableStreamHeader, session);
}
private Keyspace setupSchemaAndKeySpace()
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD),
SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true),
SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL)
.minIndexInterval(8)
.maxIndexInterval(256)
.caching(CachingParams.CACHE_NOTHING));
return Keyspace.open(KEYSPACE);
}
private void generateData()
{
// insert data and compact to a single sstable
CompactionManager.instance.disableAutoCompaction();
for (int j = 0; j < 1_000_000; j++)
{
new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
.clustering("0")
.add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
.build()
.applyUnsafe();
}
store.forceBlockingFlush();
CompactionManager.instance.performMaximal(store, false);
}
@TearDown
public void tearDown() throws IOException
{
context.close();
SchemaLoader.cleanupAndLeaveDirs();
CommitLog.instance.stopUnsafe(true);
}
private StreamSession setupStreamingSessionForTest()
{
StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, false, null, PreviewKind.NONE);
StreamResultFuture future = StreamResultFuture.createInitiator(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator);
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED));
StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
session.init(future);
return session;
}
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void blockStreamWriter(BenchmarkState state) throws Exception
{
EmbeddedChannel channel = createMockNettyChannel();
AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
state.blockStreamWriter.write(out);
out.close();
channel.finishAndReleaseAll();
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void blockStreamReader(BenchmarkState state) throws Exception
{
EmbeddedChannel channel = createMockNettyChannel();
AsyncStreamingInputPlus in = new AsyncStreamingInputPlus(channel);
in.append(state.serializedBlockStream.retainedDuplicate());
SSTableMultiWriter sstableWriter = state.blockStreamReader.read(in);
Collection<SSTableReader> newSstables = sstableWriter.finished();
in.close();
channel.finishAndReleaseAll();
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void partialStreamWriter(BenchmarkState state) throws Exception
{
EmbeddedChannel channel = createMockNettyChannel();
AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
state.partialStreamWriter.write(out);
out.close();
channel.finishAndReleaseAll();
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void partialStreamReader(BenchmarkState state) throws Exception
{
EmbeddedChannel channel = createMockNettyChannel();
AsyncStreamingInputPlus in = new AsyncStreamingInputPlus(channel);
in.append(state.serializedPartialStream.retainedDuplicate());
SSTableMultiWriter sstableWriter = state.partialStreamReader.read(in);
Collection<SSTableReader> newSstables = sstableWriter.finished();
in.close();
channel.finishAndReleaseAll();
}
private EmbeddedChannel createMockNettyChannel()
{
EmbeddedChannel channel = new EmbeddedChannel();
channel.config().setWriteBufferHighWaterMark(STREAM_SIZE); // avoid blocking
return channel;
}
private static class CapturingNettyChannel extends EmbeddedChannel
{
private final ByteBuf serializedStream;
private final WritableByteChannel proxyWBC = new WritableByteChannel()
{
public int write(ByteBuffer src) throws IOException
{
int rem = src.remaining();
serializedStream.writeBytes(src);
return rem;
}
public boolean isOpen()
{
return true;
}
public void close() throws IOException
{
}
};
public CapturingNettyChannel(int capacity)
{
this.serializedStream = alloc().buffer(capacity);
this.pipeline().addLast(new ChannelOutboundHandlerAdapter()
{
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
{
if (msg instanceof ByteBuf)
serializedStream.writeBytes((ByteBuf) msg);
else if (msg instanceof ByteBuffer)
serializedStream.writeBytes((ByteBuffer) msg);
else if (msg instanceof DefaultFileRegion)
((DefaultFileRegion) msg).transferTo(proxyWBC, 0);
}
});
config().setWriteBufferHighWaterMark(capacity);
}
public ByteBuf getSerializedStream()
{
return serializedStream.copy();
}
}
}