| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.db.transform; |
| |
| import java.net.InetAddress; |
| import java.nio.ByteBuffer; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| |
| import com.google.common.collect.Iterators; |
| import org.junit.*; |
| |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.cql3.CQLTester; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.marshal.AbstractType; |
| import org.apache.cassandra.db.partitions.*; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.net.*; |
| import org.apache.cassandra.utils.DiagnosticSnapshotService; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| import static org.apache.cassandra.utils.ByteBufferUtil.bytes; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| public class DuplicateRowCheckerTest extends CQLTester |
| { |
| ColumnFamilyStore cfs; |
| CFMetaData metadata; |
| static HashMap<InetAddress, MessageOut> sentMessages; |
| |
| @BeforeClass |
| public static void setupMessaging() |
| { |
| sentMessages = new HashMap<>(); |
| IMessageSink sink = new IMessageSink() |
| { |
| public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) |
| { |
| sentMessages.put(to, message); |
| return false; |
| } |
| |
| public boolean allowIncomingMessage(MessageIn message, int id) |
| { |
| return false; |
| } |
| }; |
| MessagingService.instance().addMessageSink(sink); |
| } |
| |
| @Before |
| public void setup() throws Throwable |
| { |
| DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true); |
| System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0"); |
| // Create a table and insert some data. The actual rows read in the test will be synthetic |
| // but this creates an sstable on disk to be snapshotted. |
| createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| |
| metadata = getCurrentColumnFamilyStore().metadata; |
| cfs = getCurrentColumnFamilyStore(); |
| sentMessages.clear(); |
| } |
| |
| @Test |
| public void noDuplicates() |
| { |
| // no duplicates |
| iterate(iter(metadata, |
| false, |
| makeRow(metadata, 0, 0), |
| makeRow(metadata, 0, 1), |
| makeRow(metadata, 0, 2))); |
| assertCommandIssued(sentMessages, false); |
| } |
| |
| @Test |
| public void singleDuplicateForward() |
| { |
| |
| iterate(iter(metadata, |
| false, |
| makeRow(metadata, 0, 0), |
| makeRow(metadata, 0, 1), |
| makeRow(metadata, 0, 1))); |
| assertCommandIssued(sentMessages, true); |
| } |
| |
| @Test |
| public void singleDuplicateReverse() |
| { |
| iterate(iter(metadata, |
| true, |
| makeRow(metadata, 0, 0), |
| makeRow(metadata, 0, 1), |
| makeRow(metadata, 0, 1))); |
| assertCommandIssued(sentMessages, true); |
| } |
| |
| @Test |
| public void multipleContiguousForward() |
| { |
| iterate(iter(metadata, |
| false, |
| makeRow(metadata, 0, 1), |
| makeRow(metadata, 0, 1), |
| makeRow(metadata, 0, 1))); |
| assertCommandIssued(sentMessages, true); |
| } |
| |
| @Test |
| public void multipleContiguousReverse() |
| { |
| iterate(iter(metadata, |
| true, |
| makeRow(metadata, 0, 1), |
| makeRow(metadata, 0, 1), |
| makeRow(metadata, 0, 1))); |
| assertCommandIssued(sentMessages, true); |
| } |
| |
| @Test |
| public void multipleDisjointForward() |
| { |
| iterate(iter(metadata, |
| false, |
| makeRow(metadata, 0, 0), |
| makeRow(metadata, 0, 0), |
| makeRow(metadata, 0, 1), |
| makeRow(metadata, 0, 2), |
| makeRow(metadata, 0, 2))); |
| assertCommandIssued(sentMessages, true); |
| } |
| |
| @Test |
| public void multipleDisjointReverse() |
| { |
| iterate(iter(metadata, |
| true, |
| makeRow(metadata, 0, 0), |
| makeRow(metadata, 0, 0), |
| makeRow(metadata, 0, 1), |
| makeRow(metadata, 0, 2), |
| makeRow(metadata, 0, 2))); |
| assertCommandIssued(sentMessages, true); |
| } |
| |
| public static void assertCommandIssued(HashMap<InetAddress, MessageOut> sent, boolean isExpected) |
| { |
| assertEquals(isExpected, !sent.isEmpty()); |
| if (isExpected) |
| { |
| assertEquals(1, sent.size()); |
| assertTrue(sent.containsKey(FBUtilities.getBroadcastAddress())); |
| SnapshotCommand command = (SnapshotCommand) sent.get(FBUtilities.getBroadcastAddress()).payload; |
| assertTrue(command.snapshot_name.startsWith(DiagnosticSnapshotService.DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX)); |
| } |
| } |
| |
| private void iterate(UnfilteredPartitionIterator iter) |
| { |
| try (PartitionIterator partitions = applyChecker(iter)) |
| { |
| while (partitions.hasNext()) |
| { |
| try (RowIterator partition = partitions.next()) |
| { |
| partition.forEachRemaining(u -> {}); |
| } |
| } |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <T> ByteBuffer decompose(AbstractType<?> type, T value) |
| { |
| return ((AbstractType<T>) type).decompose(value); |
| } |
| |
| public static Row makeRow(CFMetaData metadata, Object... clusteringValues) |
| { |
| ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length]; |
| for (int i = 0; i < clusteringValues.length; i++) |
| clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); |
| |
| return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), LivenessInfo.create(metadata, 0, 0)); |
| } |
| |
| private static PartitionIterator applyChecker(UnfilteredPartitionIterator unfiltered) |
| { |
| int nowInSecs = 0; |
| return DuplicateRowChecker.duringRead(FilteredPartitions.filter(unfiltered, nowInSecs), |
| Collections.singletonList(FBUtilities.getBroadcastAddress())); |
| } |
| |
| public static UnfilteredPartitionIterator iter(CFMetaData metadata, boolean isReversedOrder, Unfiltered... unfiltereds) |
| { |
| DecoratedKey key = metadata.partitioner.decorateKey(bytes("key")); |
| Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds); |
| |
| UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(metadata, |
| key, |
| DeletionTime.LIVE, |
| metadata.partitionColumns(), |
| Rows.EMPTY_STATIC_ROW, |
| isReversedOrder, |
| EncodingStats.NO_STATS) |
| { |
| protected Unfiltered computeNext() |
| { |
| return iterator.hasNext() ? iterator.next() : endOfData(); |
| } |
| }; |
| |
| return new SingletonUnfilteredPartitionIterator(rowIter, false); |
| } |
| } |