| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.index; |
| |
| import java.io.FileNotFoundException; |
| import java.net.SocketException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import com.google.common.collect.Sets; |
| import org.junit.After; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.cql3.CQLTester; |
| import org.apache.cassandra.db.ColumnFamilyStore; |
| import org.apache.cassandra.db.SystemKeyspace; |
| import org.apache.cassandra.db.compaction.CompactionInfo; |
| import org.apache.cassandra.db.lifecycle.SSTableSet; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.notifications.SSTableAddedNotification; |
| import org.apache.cassandra.schema.IndexMetadata; |
| import org.apache.cassandra.utils.JVMStabilityInspector; |
| import org.apache.cassandra.utils.KillerForTests; |
| import org.apache.cassandra.utils.concurrent.Refs; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| public class SecondaryIndexManagerTest extends CQLTester |
| { |
| |
| @After |
| public void after() |
| { |
| TestingIndex.clear(); |
| } |
| |
| @Test |
| public void creatingIndexMarksTheIndexAsBuilt() throws Throwable |
| { |
| String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String indexName = createIndex("CREATE INDEX ON %s(c)"); |
| |
| waitForIndex(KEYSPACE, tableName, indexName); |
| assertMarkedAsBuilt(indexName); |
| } |
| |
| @Test |
| public void rebuilOrRecoveringIndexMarksTheIndexAsBuilt() throws Throwable |
| { |
| String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String indexName = createIndex("CREATE INDEX ON %s(c)"); |
| |
| waitForIndex(KEYSPACE, tableName, indexName); |
| assertMarkedAsBuilt(indexName); |
| |
| assertTrue(tryRebuild(indexName, false)); |
| assertMarkedAsBuilt(indexName); |
| } |
| |
| @Test |
| public void recreatingIndexMarksTheIndexAsBuilt() throws Throwable |
| { |
| String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String indexName = createIndex("CREATE INDEX ON %s(c)"); |
| |
| waitForIndex(KEYSPACE, tableName, indexName); |
| assertMarkedAsBuilt(indexName); |
| |
| // drop the index and verify that it has been removed from the built indexes table |
| dropIndex("DROP INDEX %s." + indexName); |
| assertNotMarkedAsBuilt(indexName); |
| |
| // create the index again and verify that it's added to the built indexes table |
| createIndex(String.format("CREATE INDEX %s ON %%s(c)", indexName)); |
| waitForIndex(KEYSPACE, tableName, indexName); |
| assertMarkedAsBuilt(indexName); |
| } |
| |
| @Test |
| public void addingSSTablesMarksTheIndexAsBuilt() throws Throwable |
| { |
| String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String indexName = createIndex("CREATE INDEX ON %s(c)"); |
| |
| waitForIndex(KEYSPACE, tableName, indexName); |
| assertMarkedAsBuilt(indexName); |
| |
| ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); |
| cfs.indexManager.markAllIndexesRemoved(); |
| assertNotMarkedAsBuilt(indexName); |
| |
| try (Refs<SSTableReader> sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL))) |
| { |
| cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker()); |
| assertMarkedAsBuilt(indexName); |
| } |
| } |
| |
| @Test |
| public void cannotRebuildRecoverWhileInitializationIsInProgress() throws Throwable |
| { |
| // create an index which blocks on creation |
| TestingIndex.blockCreate(); |
| String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); |
| String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); |
| |
| // try to rebuild/recover the index before the index creation task has finished |
| assertFalse(tryRebuild(defaultIndexName, false)); |
| assertFalse(tryRebuild(readOnlyIndexName, false)); |
| assertFalse(tryRebuild(writeOnlyIndexName, false)); |
| assertNotMarkedAsBuilt(defaultIndexName); |
| assertNotMarkedAsBuilt(readOnlyIndexName); |
| assertNotMarkedAsBuilt(writeOnlyIndexName); |
| |
| // check that the index is marked as built when the creation finishes |
| TestingIndex.unblockCreate(); |
| waitForIndex(KEYSPACE, tableName, defaultIndexName); |
| waitForIndex(KEYSPACE, tableName, readOnlyIndexName); |
| waitForIndex(KEYSPACE, tableName, writeOnlyIndexName); |
| assertMarkedAsBuilt(defaultIndexName); |
| assertMarkedAsBuilt(readOnlyIndexName); |
| assertMarkedAsBuilt(writeOnlyIndexName); |
| |
| // now verify you can rebuild/recover |
| assertTrue(tryRebuild(defaultIndexName, false)); |
| assertTrue(tryRebuild(readOnlyIndexName, false)); |
| assertTrue(tryRebuild(readOnlyIndexName, false)); |
| assertMarkedAsBuilt(defaultIndexName); |
| assertMarkedAsBuilt(readOnlyIndexName); |
| assertMarkedAsBuilt(writeOnlyIndexName); |
| } |
| |
| @Test |
| public void cannotRebuildOrRecoverWhileAnotherRebuildIsInProgress() throws Throwable |
| { |
| String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); |
| String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); |
| final AtomicBoolean error = new AtomicBoolean(); |
| |
| // wait for index initialization and verify it's built: |
| waitForIndex(KEYSPACE, tableName, defaultIndexName); |
| waitForIndex(KEYSPACE, tableName, readOnlyIndexName); |
| waitForIndex(KEYSPACE, tableName, writeOnlyIndexName); |
| assertMarkedAsBuilt(defaultIndexName); |
| assertMarkedAsBuilt(readOnlyIndexName); |
| assertMarkedAsBuilt(writeOnlyIndexName); |
| |
| // rebuild the index in another thread, but make it block: |
| TestingIndex.blockBuild(); |
| Thread asyncBuild = new Thread(() -> { |
| try |
| { |
| tryRebuild(defaultIndexName, false); |
| } |
| catch (Throwable ex) |
| { |
| error.set(true); |
| } |
| }); |
| asyncBuild.start(); |
| |
| // wait for the rebuild to block, so that we can proceed unblocking all further operations: |
| TestingIndex.waitBlockedOnBuild(); |
| |
| // do not block further builds: |
| TestingIndex.shouldBlockBuild = false; |
| |
| // verify rebuilding the index before the previous index build task has finished fails |
| assertFalse(tryRebuild(defaultIndexName, false)); |
| assertNotMarkedAsBuilt(defaultIndexName); |
| |
| // check that the index is marked as built when the build finishes |
| TestingIndex.unblockBuild(); |
| asyncBuild.join(); |
| assertMarkedAsBuilt(defaultIndexName); |
| assertMarkedAsBuilt(readOnlyIndexName); |
| assertMarkedAsBuilt(writeOnlyIndexName); |
| |
| // now verify you can rebuild |
| assertTrue(tryRebuild(defaultIndexName, false)); |
| assertTrue(tryRebuild(readOnlyIndexName, false)); |
| assertTrue(tryRebuild(writeOnlyIndexName, false)); |
| assertMarkedAsBuilt(defaultIndexName); |
| assertMarkedAsBuilt(readOnlyIndexName); |
| assertMarkedAsBuilt(writeOnlyIndexName); |
| } |
| |
| @Test |
| public void cannotRebuildWhileAnSSTableBuildIsInProgress() throws Throwable |
| { |
| final String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| final String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| final AtomicBoolean error = new AtomicBoolean(); |
| |
| // wait for index initialization and verify it's built: |
| waitForIndex(KEYSPACE, tableName, indexName); |
| assertMarkedAsBuilt(indexName); |
| |
| // add sstables in another thread, but make it block: |
| TestingIndex.blockBuild(); |
| Thread asyncBuild = new Thread(() -> { |
| ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); |
| try (Refs<SSTableReader> sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL))) |
| { |
| cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker()); |
| } |
| catch (Throwable ex) |
| { |
| error.set(true); |
| } |
| }); |
| asyncBuild.start(); |
| |
| // wait for the build to block, so that we can proceed unblocking all further operations: |
| TestingIndex.waitBlockedOnBuild(); |
| |
| // do not block further builds: |
| TestingIndex.shouldBlockBuild = false; |
| |
| // verify rebuilding the index before the previous index build task has finished fails |
| assertFalse(tryRebuild(indexName, false)); |
| assertNotMarkedAsBuilt(indexName); |
| |
| // check that the index is marked as built when the build finishes |
| TestingIndex.unblockBuild(); |
| asyncBuild.join(); |
| assertMarkedAsBuilt(indexName); |
| |
| // now verify you can rebuild |
| assertTrue(tryRebuild(indexName, false)); |
| assertMarkedAsBuilt(indexName); |
| } |
| |
| @Test |
| public void addingSSTableWhileRebuildIsInProgress() throws Throwable |
| { |
| final String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| final String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| final AtomicBoolean error = new AtomicBoolean(); |
| |
| // wait for index initialization and verify it's built: |
| waitForIndex(KEYSPACE, tableName, indexName); |
| assertMarkedAsBuilt(indexName); |
| |
| // rebuild the index in another thread, but make it block: |
| TestingIndex.blockBuild(); |
| Thread asyncBuild = new Thread(() -> { |
| try |
| { |
| tryRebuild(indexName, false); |
| } |
| catch (Throwable ex) |
| { |
| error.set(true); |
| } |
| }); |
| asyncBuild.start(); |
| |
| // wait for the rebuild to block, so that we can proceed unblocking all further operations: |
| TestingIndex.waitBlockedOnBuild(); |
| |
| // do not block further builds: |
| TestingIndex.shouldBlockBuild = false; |
| |
| // try adding sstables and verify they are built but the index is not marked as built because of the pending build: |
| ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); |
| try (Refs<SSTableReader> sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL))) |
| { |
| cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker()); |
| assertNotMarkedAsBuilt(indexName); |
| } |
| |
| // unblock the pending build: |
| TestingIndex.unblockBuild(); |
| asyncBuild.join(); |
| |
| // verify the index is now built: |
| assertMarkedAsBuilt(indexName); |
| assertFalse(error.get()); |
| } |
| |
| @Test |
| public void addingSSTableWithBuildFailureWhileRebuildIsInProgress() throws Throwable |
| { |
| final String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| final String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| final AtomicBoolean error = new AtomicBoolean(); |
| |
| // wait for index initialization and verify it's built: |
| waitForIndex(KEYSPACE, tableName, indexName); |
| assertMarkedAsBuilt(indexName); |
| |
| // rebuild the index in another thread, but make it block: |
| TestingIndex.blockBuild(); |
| Thread asyncBuild = new Thread(() -> { |
| try |
| { |
| tryRebuild(indexName, false); |
| } |
| catch (Throwable ex) |
| { |
| error.set(true); |
| } |
| }); |
| asyncBuild.start(); |
| |
| // wait for the rebuild to block, so that we can proceed unblocking all further operations: |
| TestingIndex.waitBlockedOnBuild(); |
| |
| // do not block further builds: |
| TestingIndex.shouldBlockBuild = false; |
| |
| // try adding sstables but make the build fail: |
| TestingIndex.shouldFailBuild = true; |
| ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); |
| try (Refs<SSTableReader> sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL))) |
| { |
| cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker()); |
| fail("Should have failed!"); |
| } |
| catch (Throwable ex) |
| { |
| assertTrue(ex.getMessage().contains("configured to fail")); |
| } |
| |
| // disable failures: |
| TestingIndex.shouldFailBuild = false; |
| |
| // unblock the pending build: |
| TestingIndex.unblockBuild(); |
| asyncBuild.join(); |
| |
| // verify the index is *not* built due to the failing sstable build: |
| assertNotMarkedAsBuilt(indexName); |
| assertFalse(error.get()); |
| } |
| |
| @Test |
| public void rebuildWithFailure() throws Throwable |
| { |
| final String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| final String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| waitForIndex(KEYSPACE, tableName, indexName); |
| |
| // Rebuild the index with failure and verify it is not marked as built |
| TestingIndex.shouldFailBuild = true; |
| try |
| { |
| ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); |
| cfs.indexManager.rebuildIndexesBlocking(Collections.singleton(indexName)); |
| fail("Should have failed!"); |
| } |
| catch (Throwable ex) |
| { |
| assertTrue(ex.getMessage().contains("configured to fail")); |
| } |
| assertNotMarkedAsBuilt(indexName); |
| } |
| |
| @Test |
| public void initializingIndexNotQueryableButMaybeWritable() throws Throwable |
| { |
| TestingIndex.blockCreate(); |
| String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); |
| String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); |
| |
| // the index shouldn't be queryable while the initialization hasn't finished |
| assertFalse(isQueryable(defaultIndexName)); |
| assertFalse(isQueryable(readOnlyIndexName)); |
| assertFalse(isQueryable(writeOnlyIndexName)); |
| assertTrue(isWritable(defaultIndexName)); |
| assertTrue(isWritable(readOnlyIndexName)); |
| assertTrue(isWritable(writeOnlyIndexName)); |
| |
| // the index should be queryable once the initialization has finished |
| TestingIndex.unblockCreate(); |
| waitForIndex(KEYSPACE, tableName, defaultIndexName); |
| waitForIndex(KEYSPACE, tableName, readOnlyIndexName); |
| waitForIndex(KEYSPACE, tableName, writeOnlyIndexName); |
| assertTrue(isQueryable(defaultIndexName)); |
| assertTrue(isQueryable(readOnlyIndexName)); |
| assertTrue(isQueryable(writeOnlyIndexName)); |
| assertTrue(isWritable(defaultIndexName)); |
| assertTrue(isWritable(readOnlyIndexName)); |
| assertTrue(isWritable(writeOnlyIndexName)); |
| } |
| |
| @Test |
| public void initializingIndexNotQueryableButMaybeNotWritableAfterPartialRebuild() throws Throwable |
| { |
| TestingIndex.blockCreate(); |
| String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); |
| String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); |
| |
| // the index should never be queryable while the initialization hasn't finished |
| assertFalse(isQueryable(defaultIndexName)); |
| assertFalse(isQueryable(readOnlyIndexName)); |
| assertFalse(isQueryable(writeOnlyIndexName)); |
| |
| // the index should always we writable while the initialization hasn't finished |
| assertTrue(isWritable(defaultIndexName)); |
| assertTrue(isWritable(readOnlyIndexName)); |
| assertTrue(isWritable(writeOnlyIndexName)); |
| |
| // a failing partial build doesn't set the index as queryable, but might set it as not writable |
| TestingIndex.shouldFailBuild = true; |
| ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); |
| try |
| { |
| cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this); |
| fail("Should have failed!"); |
| } |
| catch (Throwable ex) |
| { |
| assertTrue(ex.getMessage().contains("configured to fail")); |
| } |
| assertFalse(isQueryable(defaultIndexName)); |
| assertFalse(isQueryable(readOnlyIndexName)); |
| assertFalse(isQueryable(writeOnlyIndexName)); |
| assertTrue(isWritable(defaultIndexName)); |
| assertFalse(isWritable(readOnlyIndexName)); |
| assertTrue(isWritable(writeOnlyIndexName)); |
| |
| // a successful partial build doesn't set the index as queryable nor writable |
| TestingIndex.shouldFailBuild = false; |
| cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this); |
| assertFalse(isQueryable(defaultIndexName)); |
| assertFalse(isQueryable(readOnlyIndexName)); |
| assertFalse(isQueryable(writeOnlyIndexName)); |
| assertTrue(isWritable(defaultIndexName)); |
| assertFalse(isWritable(readOnlyIndexName)); |
| assertTrue(isWritable(writeOnlyIndexName)); |
| |
| // the index should be queryable once the initialization has finished |
| TestingIndex.unblockCreate(); |
| waitForIndex(KEYSPACE, tableName, defaultIndexName); |
| assertTrue(isQueryable(defaultIndexName)); |
| assertTrue(isQueryable(readOnlyIndexName)); |
| assertTrue(isQueryable(writeOnlyIndexName)); |
| assertTrue(isWritable(defaultIndexName)); |
| assertTrue(isWritable(readOnlyIndexName)); |
| assertTrue(isWritable(writeOnlyIndexName)); |
| } |
| |
| @Test |
| public void indexWithFailedInitializationIsQueryableAndWritableAfterFullRebuild() throws Throwable |
| { |
| TestingIndex.shouldFailCreate = true; |
| createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); |
| String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); |
| assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName)); |
| assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName)); |
| assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName)); |
| |
| tryRebuild(defaultIndexName, true); |
| tryRebuild(readOnlyIndexName, true); |
| tryRebuild(writeOnlyIndexName, true); |
| TestingIndex.shouldFailCreate = false; |
| |
| // a successfull full rebuild should set the index as queryable/writable |
| ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); |
| cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(defaultIndexName, readOnlyIndexName, writeOnlyIndexName)); |
| assertTrue(isQueryable(defaultIndexName)); |
| assertTrue(isQueryable(readOnlyIndexName)); |
| assertTrue(isQueryable(writeOnlyIndexName)); |
| assertTrue(isWritable(defaultIndexName)); |
| assertTrue(isWritable(readOnlyIndexName)); |
| assertTrue(isWritable(writeOnlyIndexName)); |
| } |
| |
| @Test |
| public void indexWithFailedInitializationDoesNotChangeQueryabilityNorWritabilityAfterPartialRebuild() throws Throwable |
| { |
| TestingIndex.shouldFailCreate = true; |
| createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); |
| String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); |
| assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName)); |
| assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName)); |
| assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName)); |
| TestingIndex.shouldFailCreate = false; |
| |
| // the index should never be queryable, but it could be writable after the failed initialization |
| assertFalse(isQueryable(defaultIndexName)); |
| assertFalse(isQueryable(readOnlyIndexName)); |
| assertFalse(isQueryable(writeOnlyIndexName)); |
| assertTrue(isWritable(defaultIndexName)); |
| assertFalse(isWritable(readOnlyIndexName)); |
| assertTrue(isWritable(writeOnlyIndexName)); |
| |
| // a successful partial build doesn't set the index as queryable nor writable |
| ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); |
| cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this); |
| assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName)); |
| assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName)); |
| assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName)); |
| assertFalse(isQueryable(defaultIndexName)); |
| assertFalse(isQueryable(readOnlyIndexName)); |
| assertFalse(isQueryable(writeOnlyIndexName)); |
| assertTrue(isWritable(defaultIndexName)); |
| assertFalse(isWritable(readOnlyIndexName)); |
| assertTrue(isWritable(writeOnlyIndexName)); |
| } |
| |
| @Test |
| public void handleJVMStablityOnFailedCreate() |
| { |
| handleJVMStablityOnFailedCreate(new SocketException("Should not fail"), false); |
| handleJVMStablityOnFailedCreate(new FileNotFoundException("Should not fail"), false); |
| handleJVMStablityOnFailedCreate(new SocketException("Too many open files"), true); |
| handleJVMStablityOnFailedCreate(new FileNotFoundException("Too many open files"), true); |
| handleJVMStablityOnFailedCreate(new RuntimeException("Should not fail"), false); |
| } |
| |
| private void handleJVMStablityOnFailedCreate(Throwable throwable, boolean shouldKillJVM) |
| { |
| KillerForTests killerForTests = new KillerForTests(); |
| JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); |
| |
| try |
| { |
| TestingIndex.shouldFailCreate = true; |
| TestingIndex.failedCreateThrowable = throwable; |
| |
| createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| tryRebuild(indexName, true); |
| fail("Should have failed!"); |
| } |
| catch (Throwable t) |
| { |
| assertEquals(shouldKillJVM, killerForTests.wasKilled()); |
| } |
| finally |
| { |
| JVMStabilityInspector.replaceKiller(originalKiller); |
| TestingIndex.shouldFailCreate = false; |
| TestingIndex.failedCreateThrowable = null; |
| } |
| } |
| |
| @Test |
| public void handleJVMStablityOnFailedRebuild() throws Throwable |
| { |
| handleJVMStablityOnFailedRebuild(new SocketException("Should not fail"), false); |
| handleJVMStablityOnFailedRebuild(new FileNotFoundException("Should not fail"), false); |
| handleJVMStablityOnFailedRebuild(new SocketException("Too many open files"), true); |
| handleJVMStablityOnFailedRebuild(new FileNotFoundException("Too many open files"), true); |
| handleJVMStablityOnFailedRebuild(new RuntimeException("Should not fail"), false); |
| } |
| |
| private void handleJVMStablityOnFailedRebuild(Throwable throwable, boolean shouldKillJVM) throws Throwable |
| { |
| String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); |
| String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); |
| waitForIndex(KEYSPACE, tableName, indexName); |
| |
| KillerForTests killerForTests = new KillerForTests(); |
| JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); |
| |
| try |
| { |
| TestingIndex.shouldFailBuild = true; |
| TestingIndex.failedBuildTrowable = throwable; |
| |
| getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(Collections.singleton(indexName)); |
| fail("Should have failed!"); |
| } |
| catch (Throwable t) |
| { |
| assertEquals(shouldKillJVM, killerForTests.wasKilled()); |
| } |
| finally |
| { |
| JVMStabilityInspector.replaceKiller(originalKiller); |
| TestingIndex.shouldFailBuild = false; |
| TestingIndex.failedBuildTrowable = null; |
| } |
| } |
| |
| private static void assertMarkedAsBuilt(String indexName) |
| { |
| List<String> indexes = SystemKeyspace.getBuiltIndexes(KEYSPACE, Collections.singleton(indexName)); |
| assertEquals(1, indexes.size()); |
| assertEquals(indexName, indexes.get(0)); |
| } |
| |
| private static void assertNotMarkedAsBuilt(String indexName) |
| { |
| List<String> indexes = SystemKeyspace.getBuiltIndexes(KEYSPACE, Collections.singleton(indexName)); |
| assertTrue(indexes.isEmpty()); |
| } |
| |
| private boolean tryRebuild(String indexName, boolean wait) throws Throwable |
| { |
| ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); |
| boolean done = false; |
| do |
| { |
| try |
| { |
| cfs.indexManager.rebuildIndexesBlocking(Collections.singleton(indexName)); |
| done = true; |
| } |
| catch (IllegalStateException e) |
| { |
| assertTrue(e.getMessage().contains("currently in progress")); |
| } |
| Thread.sleep(500); |
| } |
| while (!done && wait); |
| |
| return done; |
| } |
| |
| private boolean isQueryable(String indexName) |
| { |
| SecondaryIndexManager manager = getCurrentColumnFamilyStore().indexManager; |
| Index index = manager.getIndexByName(indexName); |
| return manager.isIndexQueryable(index); |
| } |
| |
| private boolean isWritable(String indexName) |
| { |
| SecondaryIndexManager manager = getCurrentColumnFamilyStore().indexManager; |
| Index index = manager.getIndexByName(indexName); |
| return manager.isIndexWritable(index); |
| } |
| |
| public static class TestingIndex extends StubIndex |
| { |
| private static volatile CountDownLatch createLatch; |
| private static volatile CountDownLatch buildLatch; |
| private static volatile CountDownLatch createWaitLatch; |
| private static volatile CountDownLatch buildWaitLatch; |
| static volatile boolean shouldBlockCreate = false; |
| static volatile boolean shouldBlockBuild = false; |
| static volatile boolean shouldFailCreate = false; |
| static volatile boolean shouldFailBuild = false; |
| static volatile Throwable failedCreateThrowable; |
| static volatile Throwable failedBuildTrowable; |
| |
| @SuppressWarnings("WeakerAccess") |
| public TestingIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) |
| { |
| super(baseCfs, metadata); |
| } |
| |
| static void blockCreate() |
| { |
| shouldBlockCreate = true; |
| createLatch = new CountDownLatch(1); |
| createWaitLatch = new CountDownLatch(1); |
| } |
| |
| static void blockBuild() |
| { |
| shouldBlockBuild = true; |
| buildLatch = new CountDownLatch(1); |
| buildWaitLatch = new CountDownLatch(1); |
| } |
| |
| static void unblockCreate() |
| { |
| createLatch.countDown(); |
| } |
| |
| static void unblockBuild() |
| { |
| buildLatch.countDown(); |
| } |
| |
| static void waitBlockedOnCreate() throws InterruptedException |
| { |
| createWaitLatch.await(); |
| } |
| |
| static void waitBlockedOnBuild() throws InterruptedException |
| { |
| buildWaitLatch.await(); |
| } |
| |
| static void clear() |
| { |
| reset(createLatch); |
| reset(createWaitLatch); |
| reset(buildLatch); |
| reset(buildWaitLatch); |
| createLatch = null; |
| createWaitLatch = null; |
| buildLatch = null; |
| buildWaitLatch = null; |
| shouldBlockCreate = false; |
| shouldBlockBuild = false; |
| shouldFailCreate = false; |
| shouldFailBuild = false; |
| failedCreateThrowable = null; |
| failedBuildTrowable = null; |
| } |
| |
| private static void reset(CountDownLatch latch) |
| { |
| if (latch == null) |
| return; |
| |
| while (0L < latch.getCount()) |
| latch.countDown(); |
| } |
| |
| public Callable<?> getInitializationTask() |
| { |
| return () -> |
| { |
| if (shouldBlockCreate && createLatch != null) |
| { |
| createWaitLatch.countDown(); |
| createLatch.await(); |
| } |
| |
| if (shouldFailCreate) |
| { |
| throw failedCreateThrowable == null |
| ? new IllegalStateException("Index is configured to fail.") |
| : new RuntimeException(failedCreateThrowable); |
| } |
| |
| return null; |
| }; |
| } |
| |
| public IndexBuildingSupport getBuildTaskSupport() |
| { |
| return new CollatedViewIndexBuildingSupport() |
| { |
| public SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index> indexes, Collection<SSTableReader> sstables) |
| { |
| try |
| { |
| if (shouldBlockBuild && buildLatch != null) |
| { |
| buildWaitLatch.countDown(); |
| buildLatch.await(); |
| } |
| final SecondaryIndexBuilder builder = super.getIndexBuildTask(cfs, indexes, sstables); |
| return new SecondaryIndexBuilder() |
| { |
| |
| @Override |
| public void build() |
| { |
| if (shouldFailBuild) |
| { |
| throw failedBuildTrowable == null |
| ? new IllegalStateException("Index is configured to fail.") |
| : new RuntimeException(failedBuildTrowable); |
| } |
| builder.build(); |
| } |
| |
| @Override |
| public CompactionInfo getCompactionInfo() |
| { |
| return builder.getCompactionInfo(); |
| } |
| }; |
| } |
| catch (InterruptedException ex) |
| { |
| throw new RuntimeException(ex); |
| } |
| } |
| }; |
| } |
| |
| public boolean shouldBuildBlocking() |
| { |
| return true; |
| } |
| } |
| |
| /** |
| * <code>TestingIndex</code> that only supports reads when initial build or full rebuild has failed. |
| */ |
| public static class ReadOnlyOnFailureIndex extends TestingIndex |
| { |
| public ReadOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) |
| { |
| super(baseCfs, indexDef); |
| } |
| |
| @Override |
| public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild) |
| { |
| return LoadType.READ; |
| } |
| } |
| |
| /** |
| * <code>TestingIndex</code> that only supports writes when initial build or full rebuild has failed. |
| */ |
| public static class WriteOnlyOnFailureIndex extends TestingIndex |
| { |
| public WriteOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) |
| { |
| super(baseCfs, indexDef); |
| } |
| |
| @Override |
| public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild) |
| { |
| return LoadType.WRITE; |
| } |
| } |
| } |