| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.service; |
| |
| import java.nio.ByteBuffer; |
| |
| import com.google.common.collect.Iterables; |
| |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.service.paxos.Ballot; |
| import org.apache.cassandra.service.paxos.v1.PrepareVerbHandler; |
| import org.apache.cassandra.service.paxos.v1.ProposeVerbHandler; |
| |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Supplier; |
| |
| import org.apache.cassandra.dht.Murmur3Partitioner; |
| import org.apache.cassandra.exceptions.ReadTimeoutException; |
| import org.apache.cassandra.service.paxos.PaxosOperationLock; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.Util; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.db.partitions.PartitionUpdate; |
| import org.apache.cassandra.gms.Gossiper; |
| import org.apache.cassandra.service.paxos.Commit; |
| import org.apache.cassandra.service.paxos.PaxosState; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| import static org.apache.cassandra.service.paxos.Ballot.Flag.NONE; |
| import static org.apache.cassandra.service.paxos.BallotGenerator.Global.atUnixMicros; |
| import static org.apache.cassandra.utils.Clock.Global.nanoTime; |
| import static org.junit.Assert.*; |
| |
| public class PaxosStateTest |
| { |
| @BeforeClass |
| public static void setUpClass() throws Throwable |
| { |
| SchemaLoader.loadSchema(); |
| SchemaLoader.schemaDefinition("PaxosStateTest"); |
| } |
| |
| @AfterClass |
| public static void stopGossiper() |
| { |
| Gossiper.instance.stop(); |
| } |
| |
| @Test |
| public void testCommittingAfterTruncation() throws Exception |
| { |
| ColumnFamilyStore cfs = Keyspace.open("PaxosStateTestKeyspace1").getColumnFamilyStore("Standard1"); |
| String key = "key" + nanoTime(); |
| ByteBuffer value = ByteBufferUtil.bytes(0); |
| RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), FBUtilities.timestampMicros(), key); |
| builder.clustering("a").add("val", value); |
| PartitionUpdate update = Iterables.getOnlyElement(builder.build().getPartitionUpdates()); |
| |
| // CFS should be empty initially |
| assertNoDataPresent(cfs, Util.dk(key)); |
| |
| // Commit the proposal & verify the data is present |
| Commit beforeTruncate = newProposal(0, update); |
| PaxosState.commitDirect(beforeTruncate); |
| assertDataPresent(cfs, Util.dk(key), "val", value); |
| |
| // Truncate then attempt to commit again, mutation should |
| // be ignored as the proposal predates the truncation |
| cfs.truncateBlocking(); |
| PaxosState.commitDirect(beforeTruncate); |
| assertNoDataPresent(cfs, Util.dk(key)); |
| |
| // Now try again with a ballot created after the truncation |
| long timestamp = SystemKeyspace.getTruncatedAt(update.metadata().id) + 1; |
| Commit afterTruncate = newProposal(timestamp, update); |
| PaxosState.commitDirect(afterTruncate); |
| assertDataPresent(cfs, Util.dk(key), "val", value); |
| } |
| |
| private Commit newProposal(long ballotMicros, PartitionUpdate update) |
| { |
| return Commit.newProposal(atUnixMicros(ballotMicros, NONE), update); |
| } |
| |
| private void assertDataPresent(ColumnFamilyStore cfs, DecoratedKey key, String name, ByteBuffer value) |
| { |
| Row row = Util.getOnlyRowUnfiltered(Util.cmd(cfs, key).build()); |
| assertEquals(0, ByteBufferUtil.compareUnsigned(value, |
| row.getCell(cfs.metadata().getColumn(ByteBufferUtil.bytes(name))).buffer())); |
| } |
| |
| private void assertNoDataPresent(ColumnFamilyStore cfs, DecoratedKey key) |
| { |
| Util.assertEmpty(Util.cmd(cfs, key).build()); |
| } |
| |
| @Test |
| public void testPrepareProposePaxos() throws Throwable |
| { |
| ColumnFamilyStore cfs = Keyspace.open("PaxosStateTestKeyspace1").getColumnFamilyStore("Standard1"); |
| String key = "key" + nanoTime(); |
| ByteBuffer value = ByteBufferUtil.bytes(0); |
| RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), FBUtilities.timestampMicros(), key); |
| builder.clustering("a").add("val", value); |
| PartitionUpdate update = Iterables.getOnlyElement(builder.build().getPartitionUpdates()); |
| |
| // CFS should be empty initially |
| assertNoDataPresent(cfs, Util.dk(key)); |
| |
| Ballot ballot = atUnixMicros(1000 * System.currentTimeMillis(), NONE); |
| |
| Commit commit = Commit.newPrepare(Util.dk(key), cfs.metadata(), ballot); |
| |
| assertTrue("paxos prepare stage failed", PrepareVerbHandler.doPrepare(commit).promised); |
| assertTrue("paxos propose stage failed", ProposeVerbHandler.doPropose(commit)); |
| } |
| |
| public void testPaxosLock() throws ExecutionException, InterruptedException, ExecutionException |
| { |
| DecoratedKey key = new BufferDecoratedKey(Murmur3Partitioner.MINIMUM, ByteBufferUtil.EMPTY_BYTE_BUFFER); |
| TableMetadata metadata = Keyspace.open("PaxosStateTestKeyspace1").getColumnFamilyStore("Standard1").metadata.get(); |
| Supplier<PaxosOperationLock> locker = () -> PaxosState.lock(key, metadata, System.nanoTime() + TimeUnit.SECONDS.toNanos(1L), ConsistencyLevel.SERIAL, false); |
| ExecutorService executor = Executors.newFixedThreadPool(1); |
| Future<?> future; |
| try (PaxosOperationLock lock = locker.get()) |
| { |
| try |
| { |
| try (PaxosOperationLock lock2 = locker.get()) |
| { |
| Assert.fail(); |
| } |
| } |
| catch (ReadTimeoutException rte) |
| { |
| } |
| |
| future = executor.submit(() -> { |
| try (PaxosOperationLock lock2 = locker.get()) |
| { |
| } |
| }); |
| } |
| finally |
| { |
| executor.shutdown(); |
| } |
| future.get(); |
| } |
| } |