Fix CompactionManager regression from CASSANDRA-1916 and add a better
test and more docs to prevent in the future.
Patch by jbellis, reviewed by brandonwilliams for CASSANDRA-1922
git-svn-id: https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.7.0@1053960 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/java/org/apache/cassandra/db/CompactionManager.java b/src/java/org/apache/cassandra/db/CompactionManager.java
index fede852..5af09af 100644
--- a/src/java/org/apache/cassandra/db/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/CompactionManager.java
@@ -798,6 +798,7 @@
public void write(DataOutput out) throws IOException
{
+ out.writeLong(row.dataSize);
row.echoData(out);
}
diff --git a/src/java/org/apache/cassandra/io/AbstractCompactedRow.java b/src/java/org/apache/cassandra/io/AbstractCompactedRow.java
index d51569b..eaaa157 100644
--- a/src/java/org/apache/cassandra/io/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/io/AbstractCompactedRow.java
@@ -41,11 +41,23 @@
this.key = key;
}
+ /**
+ * write the row (size + column index + filter + column data, but NOT row key) to @param out
+ */
public abstract void write(DataOutput out) throws IOException;
-
+
+ /**
+ * update @param digest with the data bytes of the row (not including row key or row size)
+ */
public abstract void update(MessageDigest digest);
+ /**
+ * @return true if there are no columns in the row AND there are no row-level tombstones to be preserved
+ */
public abstract boolean isEmpty();
+ /**
+ * @return the number of columns in the row
+ */
public abstract int columnCount();
}
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index ddf338d..5aa33e2 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -33,6 +33,7 @@
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -49,9 +50,10 @@
public class CleanupTest extends CleanupHelper
{
- public static final int LOOPS = 800;
+ public static final int LOOPS = 200;
public static final String TABLE1 = "Keyspace1";
public static final String CF1 = "Indexed1";
+ public static final String CF2 = "Standard1";
public static final ByteBuffer COLUMN = ByteBuffer.wrap("birthdate".getBytes());
public static final ByteBuffer VALUE = ByteBuffer.allocate(8);
static
@@ -61,34 +63,59 @@
}
@Test
- public void testCleanup() throws IOException, ExecutionException, InterruptedException
+ public void testCleanup() throws IOException, ExecutionException, InterruptedException, ConfigurationException
+ {
+ StorageService.instance.initServer();
+
+ Table table = Table.open(TABLE1);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2);
+
+ List<Row> rows;
+
+ // insert data and verify we get it back w/ range query
+ fillCF(cfs, LOOPS);
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ assertEquals(LOOPS, rows.size());
+
+ // with one token in the ring, owned by the local node, cleanup should be a no-op
+ CompactionManager.instance.performCleanup(cfs);
+
+ // check data is still there
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ assertEquals(LOOPS, rows.size());
+ }
+
+ @Test
+ public void testCleanupWithIndexes() throws IOException, ExecutionException, InterruptedException
{
Table table = Table.open(TABLE1);
-
ColumnFamilyStore cfs = table.getColumnFamilyStore(CF1);
- fillCF(cfs, LOOPS);
-
assertEquals(cfs.getIndexedColumns().iterator().next(), COLUMN);
- ColumnFamilyStore cfi = cfs.getIndexedColumnFamilyStore(COLUMN);
+ List<Row> rows;
+ // insert data and verify we get it back w/ range query
+ fillCF(cfs, LOOPS);
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ assertEquals(LOOPS, rows.size());
+
+ ColumnFamilyStore cfi = cfs.getIndexedColumnFamilyStore(COLUMN);
assertTrue(cfi.isIndexBuilt());
+ // verify we get it back w/ index query too
IndexExpression expr = new IndexExpression(COLUMN, IndexOperator.EQ, VALUE);
IndexClause clause = new IndexClause(Arrays.asList(expr), FBUtilities.EMPTY_BYTE_BUFFER, Integer.MAX_VALUE);
IFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
- List<Row> rows = table.getColumnFamilyStore(CF1).scan(clause, range, filter);
-
+ rows = table.getColumnFamilyStore(CF1).scan(clause, range, filter);
assertEquals(LOOPS, rows.size());
+ // nuke our token so cleanup will remove everything
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.clearUnsafe();
+ assert StorageService.instance.getLocalRanges(TABLE1).isEmpty();
- assertNotNull(tmd);
- assertEquals(0, tmd.getTokenToEndpointMap().size());
-
- // Since this test has no ring cleanup will remove all
CompactionManager.instance.performCleanup(cfs);
// row data should be gone
@@ -98,27 +125,25 @@
// not only should it be gone but there should be no data on disk, not even tombstones
assert cfs.getSSTables().isEmpty();
- // 2ary indexes should result in no results, but
+ // 2ary indexes should result in no results, too (although tombstones won't be gone until compacted)
rows = cfs.scan(clause, range, filter);
assertEquals(0, rows.size());
}
- protected void fillCF(ColumnFamilyStore store, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException
+ protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException
{
CompactionManager.instance.disableAutoCompaction();
for (int i = 0; i < rowsPerSSTable; i++)
{
String key = String.valueOf(i);
-
// create a row and update the birthdate value, test that the index query fetches the new version
RowMutation rm;
rm = new RowMutation(TABLE1, ByteBufferUtil.bytes(key));
- rm.add(new QueryPath(CF1, null, COLUMN), VALUE, System.currentTimeMillis());
- rm.apply();
+ rm.add(new QueryPath(cfs.getColumnFamilyName(), null, COLUMN), VALUE, System.currentTimeMillis());
+ rm.applyUnsafe();
}
- store.forceBlockingFlush();
- store.buildSecondaryIndexes(store.getSSTables(), store.getIndexedColumns());
+ cfs.forceBlockingFlush();
}
}