| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.db.compaction; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.cache.AutoSavingCache; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.cql3.CQLTester; |
| import org.apache.cassandra.db.lifecycle.LifecycleTransaction; |
| import org.apache.cassandra.db.view.View; |
| import org.apache.cassandra.db.view.ViewBuilderTask; |
| import org.apache.cassandra.dht.Range; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.index.Index; |
| import org.apache.cassandra.index.SecondaryIndexBuilder; |
| import org.apache.cassandra.io.sstable.IndexSummaryRedistribution; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.schema.TableId; |
| import org.apache.cassandra.service.CacheService; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| public class ActiveCompactionsTest extends CQLTester |
| { |
| @Test |
| public void testActiveCompactionTrackingRaceWithIndexBuilder() throws Throwable |
| { |
| createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); |
| String idxName = createIndex("CREATE INDEX on %s(a)"); |
| getCurrentColumnFamilyStore().disableAutoCompaction(); |
| for (int i = 0; i < 5; i++) |
| { |
| execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)"); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| } |
| |
| Index idx = getCurrentColumnFamilyStore().indexManager.getIndexByName(idxName); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| |
| ExecutorService es = Executors.newFixedThreadPool(2); |
| |
| final int loopCount = 5000; |
| for (int ii = 0; ii < loopCount; ii++) |
| { |
| CountDownLatch trigger = new CountDownLatch(1); |
| SecondaryIndexBuilder builder = idx.getBuildTaskSupport().getIndexBuildTask(getCurrentColumnFamilyStore(), Collections.singleton(idx), sstables); |
| Future<?> f1 = es.submit(() -> { |
| Uninterruptibles.awaitUninterruptibly(trigger); |
| try |
| { |
| CompactionManager.instance.submitIndexBuild(builder).get(); |
| } |
| catch (Exception e) |
| { |
| throw new RuntimeException(e); |
| } |
| }); |
| Future<?> f2 = es.submit(() -> { |
| Uninterruptibles.awaitUninterruptibly(trigger); |
| CompactionManager.instance.active.getCompactionsForSSTable(null, null); |
| }); |
| trigger.countDown(); |
| FBUtilities.waitOnFutures(Arrays.asList(f1, f2)); |
| } |
| es.shutdown(); |
| es.awaitTermination(1, TimeUnit.MINUTES); |
| } |
| |
| @Test |
| public void testSecondaryIndexTracking() throws Throwable |
| { |
| createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); |
| String idxName = createIndex("CREATE INDEX on %s(a)"); |
| getCurrentColumnFamilyStore().disableAutoCompaction(); |
| for (int i = 0; i < 5; i++) |
| { |
| execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)"); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| } |
| |
| Index idx = getCurrentColumnFamilyStore().indexManager.getIndexByName(idxName); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| SecondaryIndexBuilder builder = idx.getBuildTaskSupport().getIndexBuildTask(getCurrentColumnFamilyStore(), Collections.singleton(idx), sstables); |
| |
| MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); |
| CompactionManager.instance.submitIndexBuild(builder, mockActiveCompactions).get(); |
| |
| assertTrue(mockActiveCompactions.finished); |
| assertNotNull(mockActiveCompactions.holder); |
| assertEquals(sstables, mockActiveCompactions.holder.getCompactionInfo().getSSTables()); |
| } |
| |
| @Test |
| public void testIndexSummaryRedistributionTracking() throws Throwable |
| { |
| createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); |
| getCurrentColumnFamilyStore().disableAutoCompaction(); |
| for (int i = 0; i < 5; i++) |
| { |
| execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)"); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| } |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| try (LifecycleTransaction txn = getCurrentColumnFamilyStore().getTracker().tryModify(sstables, OperationType.INDEX_SUMMARY)) |
| { |
| Map<TableId, LifecycleTransaction> transactions = ImmutableMap.<TableId, LifecycleTransaction>builder().put(getCurrentColumnFamilyStore().metadata().id, txn).build(); |
| IndexSummaryRedistribution isr = new IndexSummaryRedistribution(transactions, 0, 1000); |
| MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); |
| CompactionManager.instance.runIndexSummaryRedistribution(isr, mockActiveCompactions); |
| assertTrue(mockActiveCompactions.finished); |
| assertNotNull(mockActiveCompactions.holder); |
| // index redistribution operates over all keyspaces/tables, we always cancel them |
| assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty()); |
| assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((sstable) -> false)); |
| } |
| } |
| |
| @Test |
| public void testViewBuildTracking() throws Throwable |
| { |
| createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY (k1, c1))"); |
| getCurrentColumnFamilyStore().disableAutoCompaction(); |
| for (int i = 0; i < 5; i++) |
| { |
| execute("INSERT INTO %s (k1, c1, val) VALUES (" + i + ", 2, 3)"); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| } |
| execute(String.format("CREATE MATERIALIZED VIEW %s.view1 AS SELECT k1, c1, val FROM %s.%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL PRIMARY KEY (val, k1, c1)", keyspace(), keyspace(), currentTable())); |
| View view = Iterables.getOnlyElement(getCurrentColumnFamilyStore().viewManager); |
| |
| Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); |
| ViewBuilderTask vbt = new ViewBuilderTask(getCurrentColumnFamilyStore(), view, new Range<>(token, token), token, 0); |
| |
| MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); |
| CompactionManager.instance.submitViewBuilder(vbt, mockActiveCompactions).get(); |
| assertTrue(mockActiveCompactions.finished); |
| assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty()); |
| // this should stop for all compactions, even if it doesn't pick any sstables; |
| assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((sstable) -> false)); |
| } |
| |
| @Test |
| public void testScrubOne() throws Throwable |
| { |
| createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); |
| getCurrentColumnFamilyStore().disableAutoCompaction(); |
| for (int i = 0; i < 5; i++) |
| { |
| execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)"); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| } |
| |
| SSTableReader sstable = Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null); |
| try (LifecycleTransaction txn = getCurrentColumnFamilyStore().getTracker().tryModify(sstable, OperationType.SCRUB)) |
| { |
| MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); |
| CompactionManager.instance.scrubOne(getCurrentColumnFamilyStore(), txn, true, false, false, mockActiveCompactions); |
| |
| assertTrue(mockActiveCompactions.finished); |
| assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), Sets.newHashSet(sstable)); |
| assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> false)); |
| assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true)); |
| } |
| |
| } |
| |
| @Test |
| public void testVerifyOne() throws Throwable |
| { |
| createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); |
| getCurrentColumnFamilyStore().disableAutoCompaction(); |
| for (int i = 0; i < 5; i++) |
| { |
| execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)"); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| } |
| |
| SSTableReader sstable = Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null); |
| MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); |
| CompactionManager.instance.verifyOne(getCurrentColumnFamilyStore(), sstable, new Verifier.Options.Builder().build(), mockActiveCompactions); |
| assertTrue(mockActiveCompactions.finished); |
| assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), Sets.newHashSet(sstable)); |
| assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> false)); |
| assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true)); |
| } |
| |
| @Test |
| public void testSubmitCacheWrite() throws ExecutionException, InterruptedException |
| { |
| AutoSavingCache.Writer writer = CacheService.instance.keyCache.getWriter(100); |
| MockActiveCompactions mockActiveCompactions = new MockActiveCompactions(); |
| CompactionManager.instance.submitCacheWrite(writer, mockActiveCompactions).get(); |
| assertTrue(mockActiveCompactions.finished); |
| assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty()); |
| } |
| |
| private static class MockActiveCompactions implements ActiveCompactionsTracker |
| { |
| public CompactionInfo.Holder holder; |
| public boolean finished = false; |
| public void beginCompaction(CompactionInfo.Holder ci) |
| { |
| holder = ci; |
| } |
| |
| public void finishCompaction(CompactionInfo.Holder ci) |
| { |
| finished = true; |
| } |
| } |
| } |