blob: 1fb37352daa285824d9978244a2809703a7840e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.batchlog;
import java.util.*;
import java.util.concurrent.ExecutionException;
import com.google.common.collect.Lists;
import org.junit.*;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.Util.PartitionerSwitcher;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.TimeUUID;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.TimeUUID.Generator.atUnixMillis;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
import static org.junit.Assert.*;
public class BatchlogManagerTest
{
private static final String KEYSPACE1 = "BatchlogManagerTest1";
private static final String CF_STANDARD1 = "Standard1";
private static final String CF_STANDARD2 = "Standard2";
private static final String CF_STANDARD3 = "Standard3";
private static final String CF_STANDARD4 = "Standard4";
private static final String CF_STANDARD5 = "Standard5";
static PartitionerSwitcher sw;
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
DatabaseDescriptor.daemonInitialization();
sw = Util.switchPartitioner(Murmur3Partitioner.instance);
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD5, 1, BytesType.instance));
}
@AfterClass
public static void cleanup()
{
sw.close();
}
@Before
public void setUp() throws Exception
{
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
InetAddressAndPort localhost = InetAddressAndPort.getByName("127.0.0.1");
metadata.updateNormalToken(Util.token("A"), localhost);
metadata.updateHostId(UUID.randomUUID(), localhost);
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
}
@Test
public void testDelete()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
TableMetadata cfm = cfs.metadata();
new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes("1234"))
.clustering("c")
.add("val", "val" + 1234)
.build()
.applyUnsafe();
DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
Iterator<Row> iter = results.iterator();
assert iter.hasNext();
Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(cfm,
dk,
FBUtilities.timestampMicros(),
FBUtilities.nowInSeconds()));
mutation.applyUnsafe();
Util.assertEmpty(Util.cmd(cfs, dk).build());
}
@Test
public void testReplay() throws Exception
{
long initialAllBatches = BatchlogManager.instance.countAllBatches();
long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
TableMetadata cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata();
// Generate 1000 mutations (100 batches of 10 mutations each) and put them all into the batchlog.
// Half batches (50) ready to be replayed, half not.
for (int i = 0; i < 100; i++)
{
List<Mutation> mutations = new ArrayList<>(10);
for (int j = 0; j < 10; j++)
{
mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
.clustering("name" + j)
.add("val", "val" + j)
.build());
}
long timestamp = i < 50
? (currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
: (currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
BatchlogManager.store(Batch.createLocal(atUnixMillis(timestamp, i), timestamp * 1000, mutations));
}
// Flush the batchlog to disk (see CASSANDRA-6822).
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches);
assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
// Force batchlog replay and wait for it to complete.
BatchlogManager.instance.startBatchlogReplay().get();
// Ensure that the first half, and only the first half, got replayed.
assertEquals(50, BatchlogManager.instance.countAllBatches() - initialAllBatches);
assertEquals(50, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
for (int i = 0; i < 100; i++)
{
String query = String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD1, i);
UntypedResultSet result = executeInternal(query);
assertNotNull(result);
if (i < 50)
{
Iterator<UntypedResultSet.Row> it = result.iterator();
assertNotNull(it);
for (int j = 0; j < 10; j++)
{
assertTrue(it.hasNext());
UntypedResultSet.Row row = it.next();
assertEquals(ByteBufferUtil.bytes(i), row.getBytes("key"));
assertEquals("name" + j, row.getString("name"));
assertEquals("val" + j, row.getString("val"));
}
assertFalse(it.hasNext());
}
else
{
assertTrue(result.isEmpty());
}
}
// Ensure that no stray mutations got somehow applied.
UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD1));
assertNotNull(result);
assertEquals(500, result.one().getLong("count"));
}
@Test
public void testTruncatedReplay() throws InterruptedException, ExecutionException
{
TableMetadata cf2 = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD2);
TableMetadata cf3 = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD3);
// Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
// Each batchlog entry with a mutation for Standard2 and Standard3.
// In the middle of the process, 'truncate' Standard2.
for (int i = 0; i < 1000; i++)
{
Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
.clustering("name" + i)
.add("val", "val" + i)
.build();
Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
.clustering("name" + i)
.add("val", "val" + i)
.build();
List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
// Make sure it's ready to be replayed, so adjust the timestamp.
long timestamp = currentTimeMillis() - BatchlogManager.getBatchlogTimeout();
if (i == 500)
SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2),
timestamp,
CommitLogPosition.NONE);
// Adjust the timestamp (slightly) to make the test deterministic.
if (i >= 500)
timestamp++;
else
timestamp--;
BatchlogManager.store(Batch.createLocal(atUnixMillis(timestamp, i), FBUtilities.timestampMicros(), mutations));
}
// Flush the batchlog to disk (see CASSANDRA-6822).
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
// Force batchlog replay and wait for it to complete.
BatchlogManager.instance.startBatchlogReplay().get();
// We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
for (int i = 0; i < 1000; i++)
{
UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD2,i));
assertNotNull(result);
if (i >= 500)
{
assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
assertEquals("name" + i, result.one().getString("name"));
assertEquals("val" + i, result.one().getString("val"));
}
else
{
assertTrue(result.isEmpty());
}
}
for (int i = 0; i < 1000; i++)
{
UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i));
assertNotNull(result);
assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
assertEquals("name" + i, result.one().getString("name"));
assertEquals("val" + i, result.one().getString("val"));
}
}
@Test
public void testAddBatch()
{
long initialAllBatches = BatchlogManager.instance.countAllBatches();
TableMetadata cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata();
long timestamp = (currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout(MILLISECONDS) * 2) * 1000;
TimeUUID uuid = nextTimeUUID();
// Add a batch with 10 mutations
List<Mutation> mutations = new ArrayList<>(10);
for (int j = 0; j < 10; j++)
{
mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
.clustering("name" + j)
.add("val", "val" + j)
.build());
}
BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.BATCHES,
uuid);
UntypedResultSet result = executeInternal(query);
assertNotNull(result);
assertEquals(1L, result.one().getLong("count"));
}
@Test
public void testRemoveBatch()
{
long initialAllBatches = BatchlogManager.instance.countAllBatches();
TableMetadata cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata();
long timestamp = (currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout(MILLISECONDS) * 2) * 1000;
TimeUUID uuid = nextTimeUUID();
// Add a batch with 10 mutations
List<Mutation> mutations = new ArrayList<>(10);
for (int j = 0; j < 10; j++)
{
mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
.clustering("name" + j)
.add("val", "val" + j)
.build());
}
// Store the batch
BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
// Remove the batch
BatchlogManager.remove(uuid);
assertEquals(initialAllBatches, BatchlogManager.instance.countAllBatches());
String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.BATCHES,
uuid);
UntypedResultSet result = executeInternal(query);
assertNotNull(result);
assertEquals(0L, result.one().getLong("count"));
}
// CASSANRDA-9223
@Test
public void testReplayWithNoPeers() throws Exception
{
StorageService.instance.getTokenMetadata().removeEndpoint(InetAddressAndPort.getByName("127.0.0.1"));
long initialAllBatches = BatchlogManager.instance.countAllBatches();
long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
TableMetadata cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata();
long timestamp = (currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout(MILLISECONDS) * 2) * 1000;
TimeUUID uuid = nextTimeUUID();
// Add a batch with 10 mutations
List<Mutation> mutations = new ArrayList<>(10);
for (int j = 0; j < 10; j++)
{
mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
.clustering("name" + j)
.add("val", "val" + j)
.build());
}
BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
assertEquals(1, BatchlogManager.instance.countAllBatches() - initialAllBatches);
// Flush the batchlog to disk (see CASSANDRA-6822).
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
assertEquals(1, BatchlogManager.instance.countAllBatches() - initialAllBatches);
assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
// Force batchlog replay and wait for it to complete.
BatchlogManager.instance.startBatchlogReplay().get();
// Replay should be cancelled as there are no peers in the ring.
assertEquals(1, BatchlogManager.instance.countAllBatches() - initialAllBatches);
}
}