blob: 2162d8559872e59531c1af9606199455857c2c33 [file] [log] [blame]
package org.apache.cassandra.db.commitlog;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.RateLimiter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameters;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.compress.DeflateCompressor;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.security.EncryptionContextGenerator;
@Ignore
public abstract class CommitLogStressTest
{
static
{
DatabaseDescriptor.daemonInitialization();
}
public static ByteBuffer dataSource;
public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
public static int numCells = 1;
public static int cellSize = 1024;
public static int rateLimit = 0;
public static int runTimeMs = 10000;
public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress";
public static int hash(int hash, ByteBuffer bytes)
{
int shift = 0;
for (int i = 0; i < bytes.limit(); i++)
{
hash += (bytes.get(i) & 0xFF) << shift;
shift = (shift + 8) & 0x1F;
}
return hash;
}
private boolean failed = false;
private volatile boolean stop = false;
private boolean randomSize = false;
private boolean discardedRun = false;
private CommitLogPosition discardedPos;
public CommitLogStressTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
{
DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
DatabaseDescriptor.setEncryptionContext(encryptionContext);
DatabaseDescriptor.setCommitLogSegmentSize(32);
}
@BeforeClass
static public void initialize() throws IOException
{
try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
{
dataSource = ByteBuffer.allocateDirect((int) fis.getChannel().size());
while (dataSource.hasRemaining())
{
fis.getChannel().read(dataSource);
}
dataSource.flip();
}
SchemaLoader.loadSchema();
SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
CommitLog.instance.stopUnsafe(true);
}
@Before
public void cleanDir() throws IOException
{
File dir = new File(location);
if (dir.isDirectory())
{
File[] files = dir.listFiles();
for (File f : files)
if (!f.delete())
Assert.fail("Failed to delete " + f);
}
else
{
dir.mkdir();
}
}
@Parameters()
public static Collection<Object[]> buildParameterizedVariants()
{
return Arrays.asList(new Object[][]{
{null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
{null, EncryptionContextGenerator.createContext(true)}, // Encryption
{ new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
{ new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
{ new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
}
@Test
public void testRandomSize() throws Exception
{
randomSize = true;
discardedRun = false;
testLog();
}
@Test
public void testFixedSize() throws Exception
{
randomSize = false;
discardedRun = false;
testLog();
}
@Test
public void testDiscardedRun() throws Exception
{
randomSize = true;
discardedRun = true;
testLog();
}
private void testLog() throws IOException, InterruptedException
{
String originalDir = DatabaseDescriptor.getCommitLogLocation();
try
{
DatabaseDescriptor.setCommitLogLocation(location);
CommitLog commitLog = new CommitLog(CommitLogArchiver.disabled()).start();
testLog(commitLog);
assert !failed;
}
finally
{
DatabaseDescriptor.setCommitLogLocation(originalDir);
}
}
private void testLog(CommitLog commitLog) throws IOException, InterruptedException {
System.out.format("\nTesting commit log size %.0fmb, compressor: %s, encryption enabled: %b, sync %s%s%s\n",
mb(DatabaseDescriptor.getCommitLogSegmentSize()),
commitLog.configuration.getCompressorName(),
commitLog.configuration.useEncryption(),
commitLog.executor.getClass().getSimpleName(),
randomSize ? " random size" : "",
discardedRun ? " with discarded run" : "");
final List<CommitlogThread> threads = new ArrayList<>();
ScheduledExecutorService scheduled = startThreads(commitLog, threads);
discardedPos = CommitLogPosition.NONE;
if (discardedRun)
{
// Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations.
Thread.sleep(runTimeMs / 3);
stop = true;
scheduled.shutdown();
scheduled.awaitTermination(2, TimeUnit.SECONDS);
for (CommitlogThread t: threads)
{
t.join();
if (t.clsp.compareTo(discardedPos) > 0)
discardedPos = t.clsp;
}
verifySizes(commitLog);
commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId,
CommitLogPosition.NONE, discardedPos);
threads.clear();
System.out.format("Discarded at %s\n", discardedPos);
verifySizes(commitLog);
scheduled = startThreads(commitLog, threads);
}
Thread.sleep(runTimeMs);
stop = true;
scheduled.shutdown();
scheduled.awaitTermination(2, TimeUnit.SECONDS);
int hash = 0;
int cells = 0;
for (CommitlogThread t: threads)
{
t.join();
hash += t.hash;
cells += t.cells;
}
verifySizes(commitLog);
commitLog.shutdownBlocking();
System.out.println("Stopped. Replaying... ");
System.out.flush();
Reader reader = new Reader();
File[] files = new File(location).listFiles();
DummyHandler handler = new DummyHandler();
reader.readAllFiles(handler, files);
for (File f : files)
if (!f.delete())
Assert.fail("Failed to delete " + f);
if (hash == reader.hash && cells == reader.cells)
System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n",
commitLog.configuration.getCompressorName(),
commitLog.configuration.useEncryption(),
reader.discarded, reader.skipped);
else
{
System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d - hash %d expected %d.\n",
commitLog.configuration.getCompressorName(),
commitLog.configuration.useEncryption(),
reader.cells, cells, cells - reader.cells, reader.discarded, reader.skipped,
reader.hash, hash);
failed = true;
}
}
private void verifySizes(CommitLog commitLog)
{
// Complete anything that's still left to write.
commitLog.executor.syncBlocking();
// Wait for any concurrent segment allocations to complete.
commitLog.segmentManager.awaitManagementTasksCompletion();
long combinedSize = 0;
for (File f : new File(commitLog.segmentManager.storageDirectory).listFiles())
combinedSize += f.length();
Assert.assertEquals(combinedSize, commitLog.getActiveOnDiskSize());
List<String> logFileNames = commitLog.getActiveSegmentNames();
Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios();
Collection<CommitLogSegment> segments = commitLog.segmentManager.getActiveSegments();
for (CommitLogSegment segment : segments)
{
Assert.assertTrue(logFileNames.remove(segment.getName()));
Double ratio = ratios.remove(segment.getName());
Assert.assertEquals(segment.logFile.length(), segment.onDiskSize());
Assert.assertEquals(segment.onDiskSize() * 1.0 / segment.contentSize(), ratio, 0.01);
}
Assert.assertTrue(logFileNames.isEmpty());
Assert.assertTrue(ratios.isEmpty());
}
private ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogThread> threads)
{
stop = false;
for (int ii = 0; ii < NUM_THREADS; ii++) {
final CommitlogThread t = new CommitlogThread(commitLog, new Random(ii));
threads.add(t);
t.start();
}
final long start = System.currentTimeMillis();
Runnable printRunnable = new Runnable()
{
long lastUpdate = 0;
public void run()
{
Runtime runtime = Runtime.getRuntime();
long maxMemory = runtime.maxMemory();
long allocatedMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long temp = 0;
long sz = 0;
for (CommitlogThread clt : threads)
{
temp += clt.counter.get();
sz += clt.dataSize;
}
double time = (System.currentTimeMillis() - start) / 1000.0;
double avg = (temp / time);
System.out.println(
String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb",
((System.currentTimeMillis() - start) / 1000),
mb(maxMemory),
mb(allocatedMemory),
mb(freeMemory),
(temp - lastUpdate),
lastUpdate,
avg,
mb(commitLog.getActiveContentSize()),
mb(commitLog.getActiveOnDiskSize()),
mb(sz / time)));
lastUpdate = temp;
}
};
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS);
return scheduled;
}
private static double mb(long maxMemory)
{
return maxMemory / (1024.0 * 1024);
}
private static double mb(double maxMemory)
{
return maxMemory / (1024 * 1024);
}
private static ByteBuffer randomBytes(int quantity, Random tlr)
{
ByteBuffer slice = ByteBuffer.allocate(quantity);
ByteBuffer source = dataSource.duplicate();
source.position(tlr.nextInt(source.capacity() - quantity));
source.limit(source.position() + quantity);
slice.put(source);
slice.flip();
return slice;
}
public class CommitlogThread extends FastThreadLocalThread
{
final AtomicLong counter = new AtomicLong();
int hash = 0;
int cells = 0;
int dataSize = 0;
final CommitLog commitLog;
final Random random;
final AtomicInteger threadID = new AtomicInteger(0);
volatile CommitLogPosition clsp;
CommitlogThread(CommitLog commitLog, Random rand)
{
this.commitLog = commitLog;
this.random = rand;
}
public void run()
{
Thread.currentThread().setName("CommitLogThread-" + threadID.getAndIncrement());
RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
final Random rand = random != null ? random : ThreadLocalRandom.current();
while (!stop)
{
if (rl != null)
rl.acquire();
ByteBuffer key = randomBytes(16, rand);
UpdateBuilder builder = UpdateBuilder.create(Schema.instance.getCFMetaData("Keyspace1", "Standard1"), Util.dk(key));
for (int ii = 0; ii < numCells; ii++)
{
int sz = randomSize ? rand.nextInt(cellSize) : cellSize;
ByteBuffer bytes = randomBytes(sz, rand);
builder.newRow("name" + ii).add("val", bytes);
hash = hash(hash, bytes);
++cells;
dataSize += sz;
}
clsp = commitLog.add(new Mutation(builder.build()));
counter.incrementAndGet();
}
}
}
class Reader extends CommitLogReader
{
int hash;
int cells;
int discarded;
int skipped;
@Override
protected void readMutation(CommitLogReadHandler handler,
byte[] inputBuffer,
int size,
CommitLogPosition minPosition,
final int entryLocation,
final CommitLogDescriptor desc) throws IOException
{
if (desc.id < discardedPos.segmentId)
{
System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation);
discarded++;
return;
}
else if (desc.id == discardedPos.segmentId && entryLocation <= discardedPos.position)
{
// Skip over this mutation.
skipped++;
return;
}
DataInputPlus bufIn = new DataInputBuffer(inputBuffer, 0, size);
Mutation mutation;
try
{
mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(),
SerializationHelper.Flag.LOCAL);
}
catch (IOException e)
{
// Test fails.
throw new AssertionError(e);
}
for (PartitionUpdate cf : mutation.getPartitionUpdates())
{
Iterator<Row> rowIterator = cf.iterator();
while (rowIterator.hasNext())
{
Row row = rowIterator.next();
if (!(UTF8Type.instance.compose(row.clustering().get(0)).startsWith("name")))
continue;
for (Cell cell : row.cells())
{
hash = hash(hash, cell.value());
++cells;
}
}
}
}
}
static class DummyHandler implements CommitLogReadHandler
{
public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException { return false; }
public void handleUnrecoverableError(CommitLogReadException exception) throws IOException { }
public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) { }
}
}