| package org.apache.cassandra.db.commitlog; |
| |
| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import junit.framework.Assert; |
| |
| import com.google.common.util.concurrent.RateLimiter; |
| |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.Util; |
| import org.apache.cassandra.UpdateBuilder; |
| import org.apache.cassandra.config.Config.CommitLogSync; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.config.ParameterizedClass; |
| import org.apache.cassandra.config.Schema; |
| import org.apache.cassandra.db.Mutation; |
| import org.apache.cassandra.db.rows.Cell; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.db.rows.SerializationHelper; |
| import org.apache.cassandra.db.partitions.PartitionUpdate; |
| import org.apache.cassandra.db.marshal.UTF8Type; |
| import org.apache.cassandra.io.util.DataInputBuffer; |
| import org.apache.cassandra.io.util.DataInputPlus; |
| |
| public class CommitLogStressTest |
| { |
| public static ByteBuffer dataSource; |
| |
| public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1; |
| public static int numCells = 1; |
| public static int cellSize = 1024; |
| public static int rateLimit = 0; |
| public static int runTimeMs = 10000; |
| |
| public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress"; |
| |
| public static int hash(int hash, ByteBuffer bytes) |
| { |
| int shift = 0; |
| for (int i = 0; i < bytes.limit(); i++) |
| { |
| hash += (bytes.get(i) & 0xFF) << shift; |
| shift = (shift + 8) & 0x1F; |
| } |
| return hash; |
| } |
| |
| public static void main(String[] args) throws Exception |
| { |
| try |
| { |
| if (args.length >= 1) |
| { |
| NUM_THREADS = Integer.parseInt(args[0]); |
| System.out.println("Setting num threads to: " + NUM_THREADS); |
| } |
| |
| if (args.length >= 2) |
| { |
| numCells = Integer.parseInt(args[1]); |
| System.out.println("Setting num cells to: " + numCells); |
| } |
| |
| if (args.length >= 3) |
| { |
| cellSize = Integer.parseInt(args[1]); |
| System.out.println("Setting cell size to: " + cellSize + " be aware the source corpus may be small"); |
| } |
| |
| if (args.length >= 4) |
| { |
| rateLimit = Integer.parseInt(args[1]); |
| System.out.println("Setting per thread rate limit to: " + rateLimit); |
| } |
| initialize(); |
| |
| CommitLogStressTest tester = new CommitLogStressTest(); |
| tester.testFixedSize(); |
| } |
| catch (Throwable e) |
| { |
| e.printStackTrace(System.err); |
| } |
| finally |
| { |
| System.exit(0); |
| } |
| } |
| |
| boolean failed = false; |
| volatile boolean stop = false; |
| boolean randomSize = false; |
| boolean discardedRun = false; |
| ReplayPosition discardedPos; |
| |
| @BeforeClass |
| static public void initialize() throws IOException |
| { |
| try (FileInputStream fis = new FileInputStream("CHANGES.txt")) |
| { |
| dataSource = ByteBuffer.allocateDirect((int) fis.getChannel().size()); |
| while (dataSource.hasRemaining()) |
| { |
| fis.getChannel().read(dataSource); |
| } |
| dataSource.flip(); |
| } |
| |
| SchemaLoader.loadSchema(); |
| SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour |
| } |
| |
| @Before |
| public void cleanDir() |
| { |
| File dir = new File(location); |
| if (dir.isDirectory()) |
| { |
| File[] files = dir.listFiles(); |
| |
| for (File f : files) |
| if (!f.delete()) |
| Assert.fail("Failed to delete " + f); |
| } |
| else |
| { |
| dir.mkdir(); |
| } |
| } |
| |
| @Test |
| public void testRandomSize() throws Exception |
| { |
| randomSize = true; |
| discardedRun = false; |
| testAllLogConfigs(); |
| } |
| |
| @Test |
| public void testFixedSize() throws Exception |
| { |
| randomSize = false; |
| discardedRun = false; |
| |
| testAllLogConfigs(); |
| } |
| |
| @Test |
| public void testDiscardedRun() throws Exception |
| { |
| discardedRun = true; |
| randomSize = true; |
| |
| testAllLogConfigs(); |
| } |
| |
| public void testAllLogConfigs() throws IOException, InterruptedException |
| { |
| failed = false; |
| DatabaseDescriptor.setCommitLogSyncBatchWindow(1); |
| DatabaseDescriptor.setCommitLogSyncPeriod(30); |
| DatabaseDescriptor.setCommitLogSegmentSize(32); |
| for (ParameterizedClass compressor : new ParameterizedClass[] { |
| null, |
| new ParameterizedClass("LZ4Compressor", null), |
| new ParameterizedClass("SnappyCompressor", null), |
| new ParameterizedClass("DeflateCompressor", null) }) |
| { |
| DatabaseDescriptor.setCommitLogCompression(compressor); |
| for (CommitLogSync sync : CommitLogSync.values()) |
| { |
| DatabaseDescriptor.setCommitLogSync(sync); |
| CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start(); |
| testLog(commitLog); |
| } |
| } |
| assert !failed; |
| } |
| |
| public void testLog(CommitLog commitLog) throws IOException, InterruptedException |
| { |
| System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n", |
| mb(DatabaseDescriptor.getCommitLogSegmentSize()), |
| commitLog.configuration.getCompressorName(), |
| commitLog.executor.getClass().getSimpleName(), |
| randomSize ? " random size" : "", |
| discardedRun ? " with discarded run" : ""); |
| commitLog.allocator.enableReserveSegmentCreation(); |
| |
| final List<CommitlogExecutor> threads = new ArrayList<>(); |
| ScheduledExecutorService scheduled = startThreads(commitLog, threads); |
| |
| discardedPos = ReplayPosition.NONE; |
| if (discardedRun) |
| { |
| // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations. |
| Thread.sleep(runTimeMs / 3); |
| stop = true; |
| scheduled.shutdown(); |
| scheduled.awaitTermination(2, TimeUnit.SECONDS); |
| |
| for (CommitlogExecutor t : threads) |
| { |
| t.join(); |
| if (t.rp.compareTo(discardedPos) > 0) |
| discardedPos = t.rp; |
| } |
| verifySizes(commitLog); |
| |
| commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, |
| ReplayPosition.NONE, discardedPos); |
| threads.clear(); |
| System.out.format("Discarded at %s\n", discardedPos); |
| verifySizes(commitLog); |
| |
| scheduled = startThreads(commitLog, threads); |
| } |
| |
| Thread.sleep(runTimeMs); |
| stop = true; |
| scheduled.shutdown(); |
| scheduled.awaitTermination(2, TimeUnit.SECONDS); |
| |
| int hash = 0; |
| int cells = 0; |
| for (CommitlogExecutor t : threads) |
| { |
| t.join(); |
| hash += t.hash; |
| cells += t.cells; |
| } |
| verifySizes(commitLog); |
| |
| commitLog.shutdownBlocking(); |
| |
| System.out.print("Stopped. Replaying... "); |
| System.out.flush(); |
| Replayer repl = new Replayer(commitLog); |
| File[] files = new File(location).listFiles(); |
| repl.recover(files); |
| |
| for (File f : files) |
| if (!f.delete()) |
| Assert.fail("Failed to delete " + f); |
| |
| if (hash == repl.hash && cells == repl.cells) |
| System.out.println("Test success."); |
| else |
| { |
| System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", |
| repl.cells, |
| cells, |
| repl.hash, |
| hash); |
| failed = true; |
| } |
| } |
| |
| private void verifySizes(CommitLog commitLog) |
| { |
| // Complete anything that's still left to write. |
| commitLog.executor.requestExtraSync().awaitUninterruptibly(); |
| // One await() does not suffice as we may be signalled when an ongoing sync finished. Request another |
| // (which shouldn't write anything) to make sure the first we triggered completes. |
| // FIXME: The executor should give us a chance to await completion of the sync we requested. |
| commitLog.executor.requestExtraSync().awaitUninterruptibly(); |
| // Wait for any pending deletes or segment allocations to complete. |
| commitLog.allocator.awaitManagementTasksCompletion(); |
| |
| long combinedSize = 0; |
| for (File f : new File(commitLog.location).listFiles()) |
| combinedSize += f.length(); |
| Assert.assertEquals(combinedSize, commitLog.getActiveOnDiskSize()); |
| |
| List<String> logFileNames = commitLog.getActiveSegmentNames(); |
| Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios(); |
| Collection<CommitLogSegment> segments = commitLog.allocator.getActiveSegments(); |
| |
| for (CommitLogSegment segment : segments) |
| { |
| Assert.assertTrue(logFileNames.remove(segment.getName())); |
| Double ratio = ratios.remove(segment.getName()); |
| |
| Assert.assertEquals(segment.logFile.length(), segment.onDiskSize()); |
| Assert.assertEquals(segment.onDiskSize() * 1.0 / segment.contentSize(), ratio, 0.01); |
| } |
| Assert.assertTrue(logFileNames.isEmpty()); |
| Assert.assertTrue(ratios.isEmpty()); |
| } |
| |
| public ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogExecutor> threads) |
| { |
| stop = false; |
| for (int ii = 0; ii < NUM_THREADS; ii++) |
| { |
| final CommitlogExecutor t = new CommitlogExecutor(commitLog, new Random(ii)); |
| threads.add(t); |
| t.start(); |
| } |
| |
| final long start = System.currentTimeMillis(); |
| Runnable printRunnable = new Runnable() |
| { |
| long lastUpdate = 0; |
| |
| public void run() |
| { |
| Runtime runtime = Runtime.getRuntime(); |
| long maxMemory = runtime.maxMemory(); |
| long allocatedMemory = runtime.totalMemory(); |
| long freeMemory = runtime.freeMemory(); |
| long temp = 0; |
| long sz = 0; |
| for (CommitlogExecutor cle : threads) |
| { |
| temp += cle.counter.get(); |
| sz += cle.dataSize; |
| } |
| double time = (System.currentTimeMillis() - start) / 1000.0; |
| double avg = (temp / time); |
| System.out.println( |
| String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb", |
| ((System.currentTimeMillis() - start) / 1000), |
| mb(maxMemory), |
| mb(allocatedMemory), |
| mb(freeMemory), |
| (temp - lastUpdate), |
| lastUpdate, |
| avg, |
| mb(commitLog.getActiveContentSize()), |
| mb(commitLog.getActiveOnDiskSize()), |
| mb(sz / time))); |
| lastUpdate = temp; |
| } |
| }; |
| ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); |
| scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS); |
| return scheduled; |
| } |
| |
| private static double mb(long maxMemory) |
| { |
| return maxMemory / (1024.0 * 1024); |
| } |
| |
| private static double mb(double maxMemory) |
| { |
| return maxMemory / (1024 * 1024); |
| } |
| |
| public static ByteBuffer randomBytes(int quantity, Random tlr) |
| { |
| ByteBuffer slice = ByteBuffer.allocate(quantity); |
| ByteBuffer source = dataSource.duplicate(); |
| source.position(tlr.nextInt(source.capacity() - quantity)); |
| source.limit(source.position() + quantity); |
| slice.put(source); |
| slice.flip(); |
| return slice; |
| } |
| |
| public class CommitlogExecutor extends Thread |
| { |
| final AtomicLong counter = new AtomicLong(); |
| int hash = 0; |
| int cells = 0; |
| int dataSize = 0; |
| final CommitLog commitLog; |
| final Random random; |
| |
| volatile ReplayPosition rp; |
| |
| public CommitlogExecutor(CommitLog commitLog, Random rand) |
| { |
| this.commitLog = commitLog; |
| this.random = rand; |
| } |
| |
| public void run() |
| { |
| RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null; |
| final Random rand = random != null ? random : ThreadLocalRandom.current(); |
| while (!stop) |
| { |
| if (rl != null) |
| rl.acquire(); |
| ByteBuffer key = randomBytes(16, rand); |
| |
| UpdateBuilder builder = UpdateBuilder.create(Schema.instance.getCFMetaData("Keyspace1", "Standard1"), Util.dk(key)); |
| for (int ii = 0; ii < numCells; ii++) |
| { |
| int sz = randomSize ? rand.nextInt(cellSize) : cellSize; |
| ByteBuffer bytes = randomBytes(sz, rand); |
| builder.newRow("name" + ii).add("val", bytes); |
| hash = hash(hash, bytes); |
| ++cells; |
| dataSize += sz; |
| } |
| |
| rp = commitLog.add(new Mutation(builder.build())); |
| counter.incrementAndGet(); |
| } |
| } |
| } |
| |
| class Replayer extends CommitLogReplayer |
| { |
| Replayer(CommitLog log) |
| { |
| super(log, discardedPos, null, ReplayFilter.create()); |
| } |
| |
| int hash = 0; |
| int cells = 0; |
| |
| @Override |
| void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) |
| { |
| if (desc.id < discardedPos.segment) |
| { |
| System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation); |
| return; |
| } |
| else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position) |
| // Skip over this mutation. |
| return; |
| |
| DataInputPlus bufIn = new DataInputBuffer(inputBuffer, 0, size); |
| Mutation mutation; |
| try |
| { |
| mutation = Mutation.serializer.deserialize(bufIn, |
| desc.getMessagingVersion(), |
| SerializationHelper.Flag.LOCAL); |
| } |
| catch (IOException e) |
| { |
| // Test fails. |
| throw new AssertionError(e); |
| } |
| |
| for (PartitionUpdate cf : mutation.getPartitionUpdates()) |
| { |
| |
| Iterator<Row> rowIterator = cf.iterator(); |
| |
| while (rowIterator.hasNext()) |
| { |
| Row row = rowIterator.next(); |
| if (!(UTF8Type.instance.compose(row.clustering().get(0)).startsWith("name"))) |
| continue; |
| |
| for (Cell cell : row.cells()) |
| { |
| hash = hash(hash, cell.value()); |
| ++cells; |
| } |
| } |
| } |
| } |
| } |
| } |