blob: f192bcfed073aff7cc377a51b5f34f11ce3a0378 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.batchlog;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import org.junit.*;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.Util.PartitionerSwitcher;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.junit.Assert.*;
public class BatchlogManagerTest
{
private static final String KEYSPACE1 = "BatchlogManagerTest1";
private static final String CF_STANDARD1 = "Standard1";
private static final String CF_STANDARD2 = "Standard2";
private static final String CF_STANDARD3 = "Standard3";
private static final String CF_STANDARD4 = "Standard4";
private static final String CF_STANDARD5 = "Standard5";
static PartitionerSwitcher sw;
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
DatabaseDescriptor.daemonInitialization();
sw = Util.switchPartitioner(Murmur3Partitioner.instance);
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD5, 1, BytesType.instance));
}
@AfterClass
public static void cleanup()
{
sw.close();
}
@Before
@SuppressWarnings("deprecation")
public void setUp() throws Exception
{
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
InetAddress localhost = InetAddress.getByName("127.0.0.1");
metadata.updateNormalToken(Util.token("A"), localhost);
metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).truncateBlocking();
}
@Test
public void testDelete()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
CFMetaData cfm = cfs.metadata;
new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes("1234"))
.clustering("c")
.add("val", "val" + 1234)
.build()
.applyUnsafe();
DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
Iterator<Row> iter = results.iterator();
assert iter.hasNext();
Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(cfm,
dk,
FBUtilities.timestampMicros(),
FBUtilities.nowInSeconds()));
mutation.applyUnsafe();
Util.assertEmpty(Util.cmd(cfs, dk).build());
}
@Test
public void testReplay() throws Exception
{
testReplay(false);
}
@Test
public void testLegacyReplay() throws Exception
{
testReplay(true);
}
@SuppressWarnings("deprecation")
private static void testReplay(boolean legacy) throws Exception
{
long initialAllBatches = BatchlogManager.instance.countAllBatches();
long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata;
// Generate 1000 mutations (100 batches of 10 mutations each) and put them all into the batchlog.
// Half batches (50) ready to be replayed, half not.
for (int i = 0; i < 100; i++)
{
List<Mutation> mutations = new ArrayList<>(10);
for (int j = 0; j < 10; j++)
{
mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
.clustering("name" + j)
.add("val", "val" + j)
.build());
}
long timestamp = i < 50
? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
: (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
if (legacy)
LegacyBatchlogMigrator.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations), MessagingService.current_version);
else
BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations));
}
if (legacy)
{
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush();
LegacyBatchlogMigrator.migrate();
}
// Flush the batchlog to disk (see CASSANDRA-6822).
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches);
assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
// Force batchlog replay and wait for it to complete.
BatchlogManager.instance.startBatchlogReplay().get();
// Ensure that the first half, and only the first half, got replayed.
assertEquals(50, BatchlogManager.instance.countAllBatches() - initialAllBatches);
assertEquals(50, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
for (int i = 0; i < 100; i++)
{
String query = String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD1, i);
UntypedResultSet result = executeInternal(query);
assertNotNull(result);
if (i < 50)
{
Iterator<UntypedResultSet.Row> it = result.iterator();
assertNotNull(it);
for (int j = 0; j < 10; j++)
{
assertTrue(it.hasNext());
UntypedResultSet.Row row = it.next();
assertEquals(ByteBufferUtil.bytes(i), row.getBytes("key"));
assertEquals("name" + j, row.getString("name"));
assertEquals("val" + j, row.getString("val"));
}
assertFalse(it.hasNext());
}
else
{
assertTrue(result.isEmpty());
}
}
// Ensure that no stray mutations got somehow applied.
UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD1));
assertNotNull(result);
assertEquals(500, result.one().getLong("count"));
}
@Test
public void testTruncatedReplay() throws InterruptedException, ExecutionException
{
CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2);
CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3);
// Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
// Each batchlog entry with a mutation for Standard2 and Standard3.
// In the middle of the process, 'truncate' Standard2.
for (int i = 0; i < 1000; i++)
{
Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
.clustering("name" + i)
.add("val", "val" + i)
.build();
Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
.clustering("name" + i)
.add("val", "val" + i)
.build();
List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
// Make sure it's ready to be replayed, so adjust the timestamp.
long timestamp = System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout();
if (i == 500)
SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2),
timestamp,
CommitLogPosition.NONE);
// Adjust the timestamp (slightly) to make the test deterministic.
if (i >= 500)
timestamp++;
else
timestamp--;
BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), FBUtilities.timestampMicros(), mutations));
}
// Flush the batchlog to disk (see CASSANDRA-6822).
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
// Force batchlog replay and wait for it to complete.
BatchlogManager.instance.startBatchlogReplay().get();
// We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
for (int i = 0; i < 1000; i++)
{
UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD2,i));
assertNotNull(result);
if (i >= 500)
{
assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
assertEquals("name" + i, result.one().getString("name"));
assertEquals("val" + i, result.one().getString("val"));
}
else
{
assertTrue(result.isEmpty());
}
}
for (int i = 0; i < 1000; i++)
{
UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i));
assertNotNull(result);
assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
assertEquals("name" + i, result.one().getString("name"));
assertEquals("val" + i, result.one().getString("val"));
}
}
@Test
@SuppressWarnings("deprecation")
public void testConversion() throws Exception
{
long initialAllBatches = BatchlogManager.instance.countAllBatches();
long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4);
// Generate 1400 version 2.0 mutations and put them all into the batchlog.
// Half ready to be replayed, half not.
for (int i = 0; i < 1400; i++)
{
Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
.clustering("name" + i)
.add("val", "val" + i)
.build();
long timestamp = i < 700
? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
: (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
Mutation batchMutation = LegacyBatchlogMigrator.getStoreMutation(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i),
TimeUnit.MILLISECONDS.toMicros(timestamp),
Collections.singleton(mutation)),
MessagingService.VERSION_20);
assertTrue(LegacyBatchlogMigrator.isLegacyBatchlogMutation(batchMutation));
LegacyBatchlogMigrator.handleLegacyMutation(batchMutation);
}
// Mix in 100 current version mutations, 50 ready for replay.
for (int i = 1400; i < 1500; i++)
{
Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
.clustering("name" + i)
.add("val", "val" + i)
.build();
long timestamp = i < 1450
? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
: (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i),
FBUtilities.timestampMicros(),
Collections.singleton(mutation)));
}
// Flush the batchlog to disk (see CASSANDRA-6822).
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
assertEquals(1500, BatchlogManager.instance.countAllBatches() - initialAllBatches);
assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_BATCHLOG));
assertNotNull(result);
assertEquals("Count in blog legacy", 0, result.one().getLong("count"));
result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.BATCHES));
assertNotNull(result);
assertEquals("Count in blog", 1500, result.one().getLong("count"));
// Force batchlog replay and wait for it to complete.
BatchlogManager.instance.performInitialReplay();
// Ensure that the first half, and only the first half, got replayed.
assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches);
assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
for (int i = 0; i < 1500; i++)
{
result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i));
assertNotNull(result);
if (i < 700 || i >= 1400 && i < 1450)
{
assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
assertEquals("name" + i, result.one().getString("name"));
assertEquals("val" + i, result.one().getString("val"));
}
else
{
assertTrue("Present at " + i, result.isEmpty());
}
}
// Ensure that no stray mutations got somehow applied.
result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4));
assertNotNull(result);
assertEquals(750, result.one().getLong("count"));
// Ensure batchlog is left as expected.
result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.BATCHES));
assertNotNull(result);
assertEquals("Count in blog after initial replay", 750, result.one().getLong("count"));
result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_BATCHLOG));
assertNotNull(result);
assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count"));
}
@Test
public void testAddBatch() throws IOException
{
long initialAllBatches = BatchlogManager.instance.countAllBatches();
CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
UUID uuid = UUIDGen.getTimeUUID();
// Add a batch with 10 mutations
List<Mutation> mutations = new ArrayList<>(10);
for (int j = 0; j < 10; j++)
{
mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
.clustering("name" + j)
.add("val", "val" + j)
.build());
}
BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.BATCHES,
uuid);
UntypedResultSet result = executeInternal(query);
assertNotNull(result);
assertEquals(1L, result.one().getLong("count"));
}
@Test
public void testRemoveBatch()
{
long initialAllBatches = BatchlogManager.instance.countAllBatches();
CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
UUID uuid = UUIDGen.getTimeUUID();
// Add a batch with 10 mutations
List<Mutation> mutations = new ArrayList<>(10);
for (int j = 0; j < 10; j++)
{
mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
.clustering("name" + j)
.add("val", "val" + j)
.build());
}
// Store the batch
BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
// Remove the batch
BatchlogManager.remove(uuid);
assertEquals(initialAllBatches, BatchlogManager.instance.countAllBatches());
String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.BATCHES,
uuid);
UntypedResultSet result = executeInternal(query);
assertNotNull(result);
assertEquals(0L, result.one().getLong("count"));
}
// CASSANRDA-9223
@Test
public void testReplayWithNoPeers() throws Exception
{
StorageService.instance.getTokenMetadata().removeEndpoint(InetAddress.getByName("127.0.0.1"));
long initialAllBatches = BatchlogManager.instance.countAllBatches();
long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata;
long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
UUID uuid = UUIDGen.getTimeUUID();
// Add a batch with 10 mutations
List<Mutation> mutations = new ArrayList<>(10);
for (int j = 0; j < 10; j++)
{
mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
.clustering("name" + j)
.add("val", "val" + j)
.build());
}
BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
assertEquals(1, BatchlogManager.instance.countAllBatches() - initialAllBatches);
// Flush the batchlog to disk (see CASSANDRA-6822).
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
assertEquals(1, BatchlogManager.instance.countAllBatches() - initialAllBatches);
assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
// Force batchlog replay and wait for it to complete.
BatchlogManager.instance.startBatchlogReplay().get();
// Replay should be cancelled as there are no peers in the ring.
assertEquals(1, BatchlogManager.instance.countAllBatches() - initialAllBatches);
}
}