blob: cc97df03b1a51cc38a0780c06bf6c03976d422aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hints;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.*;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.assertTrue;
import static org.apache.cassandra.hints.HintsTestUtil.assertMutationsEqual;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
// TODO: test split into several files
@SuppressWarnings("deprecation")
public class LegacyHintsMigratorTest
{
private static final String KEYSPACE = "legacy_hints_migrator_test";
private static final String TABLE = "table";
@BeforeClass
public static void defineSchema()
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE));
}
@Test
public void testNothingToMigrate() throws IOException
{
File directory = Files.createTempDirectory(null).toFile();
try
{
testNothingToMigrate(directory);
}
finally
{
directory.deleteOnExit();
}
}
private static void testNothingToMigrate(File directory)
{
// truncate system.hints to enseure nothing inside
Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).truncateBlocking();
new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate();
HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
assertEquals(0, catalog.stores().count());
}
@Test
public void testMigrationIsComplete() throws IOException
{
File directory = Files.createTempDirectory(null).toFile();
try
{
testMigrationIsComplete(directory);
}
finally
{
directory.deleteOnExit();
}
}
private static void testMigrationIsComplete(File directory)
{
long timestamp = System.currentTimeMillis();
// write 100 mutations for each of the 10 generated endpoints
Map<UUID, Queue<Mutation>> mutations = new HashMap<>();
for (int i = 0; i < 10; i++)
{
UUID hostId = UUID.randomUUID();
Queue<Mutation> queue = new LinkedList<>();
mutations.put(hostId, queue);
for (int j = 0; j < 100; j++)
{
Mutation mutation = createMutation(j, timestamp + j);
queue.offer(mutation);
Mutation legacyHint = createLegacyHint(mutation, timestamp, hostId);
legacyHint.applyUnsafe();
}
}
// run the migration
new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate();
// validate that the hints table is truncated now
assertTrue(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).isEmpty());
HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
// assert that we've correctly loaded 10 hints stores
assertEquals(10, catalog.stores().count());
// for each of the 10 stores, make sure the mutations have been migrated correctly
for (Map.Entry<UUID, Queue<Mutation>> entry : mutations.entrySet())
{
HintsStore store = catalog.get(entry.getKey());
assertNotNull(store);
HintsDescriptor descriptor = store.poll();
assertNotNull(descriptor);
// read all the hints
Queue<Hint> actualHints = new LinkedList<>();
try (HintsReader reader = HintsReader.open(new File(directory, descriptor.fileName())))
{
for (HintsReader.Page page : reader)
page.hintsIterator().forEachRemaining(actualHints::offer);
}
// assert the size matches
assertEquals(100, actualHints.size());
// compare expected hints to actual hints
for (int i = 0; i < 100; i++)
{
Hint hint = actualHints.poll();
Mutation mutation = entry.getValue().poll();
int ttl = mutation.smallestGCGS();
assertEquals(timestamp, hint.creationTime);
assertEquals(ttl, hint.gcgs);
assertMutationsEqual(mutation, hint.mutation);
}
}
}
// legacy hint mutation creation code, copied more or less verbatim from the previous implementation
private static Mutation createLegacyHint(Mutation mutation, long now, UUID targetId)
{
int version = MessagingService.VERSION_21;
int ttl = mutation.smallestGCGS();
UUID hintId = UUIDGen.getTimeUUID();
ByteBuffer key = UUIDType.instance.decompose(targetId);
Clustering clustering = SystemKeyspace.LegacyHints.comparator.make(hintId, version);
ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, version));
Cell cell = BufferCell.expiring(SystemKeyspace.LegacyHints.compactValueColumn(),
now,
ttl,
FBUtilities.nowInSeconds(),
value);
return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.LegacyHints,
key,
BTreeRow.singleCellRow(clustering, cell)));
}
private static Mutation createMutation(int index, long timestamp)
{
CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
return new RowUpdateBuilder(table, timestamp, bytes(index))
.clustering(bytes(index))
.add("val", bytes(index))
.build();
}
}