blob: 1b570258249b9939609dd4e5c24d67d8fc5a6345 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.local;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
import accord.api.TestableConfigurationService;
import accord.impl.InMemoryCommandStores;
import accord.impl.IntKey;
import accord.impl.TestAgent;
import accord.impl.TopologyFactory;
import accord.impl.*;
import accord.impl.mock.MockCluster;
import accord.impl.mock.MockConfigurationService;
import accord.impl.mock.MockStore;
import accord.local.Node.Id;
import accord.local.Status.Known;
import accord.primitives.*;
import accord.topology.Topology;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static;
import static accord.Utils.writeTxn;
import static accord.impl.InMemoryCommandStore.inMemory;
public class CommandTest
private static final Node.Id ID1 = id(1);
private static final Node.Id ID2 = id(2);
private static final Node.Id ID3 = id(3);
private static final List<Node.Id> IDS = Arrays.asList(ID1, ID2, ID3);
private static final Range FULL_RANGE = IntKey.range(0, 100);
private static final Ranges FULL_RANGES = Ranges.single(FULL_RANGE);
private static final Topology TOPOLOGY = TopologyFactory.toTopology(IDS, 3, FULL_RANGE);
private static final IntKey.Raw KEY = IntKey.key(10);
private static final RoutingKey HOME_KEY = KEY.toUnseekable();
private static final FullKeyRoute ROUTE = RoutingKeys.of(HOME_KEY).toRoute(HOME_KEY);
private static class CommandStoreSupport
final AtomicReference<Topology> local = new AtomicReference<>(TOPOLOGY);
final MockStore data = new MockStore();
private static void setTopologyEpoch(AtomicReference<Topology> topology, long epoch)
private static CommandStore createStore(CommandStoreSupport storeSupport)
return createNode(ID1, storeSupport).unsafeByIndex(0);
private static class NoOpProgressLog implements ProgressLog
public void unwitnessed(TxnId txnId, RoutingKey homeKey, ProgressShard shard)
public void preaccepted(Command command, ProgressShard shard)
public void accepted(Command command, ProgressShard shard)
public void committed(Command command, ProgressShard shard)
public void readyToExecute(Command command, ProgressShard shard)
public void executed(Command command, ProgressShard shard)
public void invalidated(Command command, ProgressShard shard)
public void durableLocal(TxnId txnId)
public void durable(Command command, @Nullable Set<Id> persistedOn)
public void durable(TxnId txnId, @Nullable Unseekables<?, ?> unseekables, ProgressShard shard)
public void waiting(TxnId blockedBy, Known blockedUntil, Unseekables<?, ?> blockedOn)
private static Node createNode(Id id, CommandStoreSupport storeSupport)
return new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
new MockCluster.Clock(100), () ->, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new Random(), null,
SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
void noConflictWitnessTest()
CommandStoreSupport support = new CommandStoreSupport();
CommandStore commands = createStore(support);
MockCluster.Clock clock = new MockCluster.Clock(100);
TxnId txnId = clock.idForNode(1, 1);
Txn txn = writeTxn(Keys.of(KEY));
Command command = new InMemoryCommand(commands, txnId);
Assertions.assertEquals(Status.NotWitnessed, command.status());
command.preaccept(inMemory(commands), txn.slice(FULL_RANGES, true), ROUTE, HOME_KEY);
Assertions.assertEquals(Status.PreAccepted, command.status());
Assertions.assertEquals(txnId, command.executeAt());
void supersedingEpochWitnessTest()
CommandStoreSupport support = new CommandStoreSupport();
Node node = createNode(ID1, support);
CommandStore commands = node.unsafeByIndex(0);
TxnId txnId = node.nextTxnId();
Txn txn = writeTxn(Keys.of(KEY));
Command command = new InMemoryCommand(commands, txnId);
Assertions.assertEquals(Status.NotWitnessed, command.status());
setTopologyEpoch(support.local, 2);
Timestamp expectedTimestamp = new Timestamp(2, 110, 0, ID1);
commands.execute(null, (Consumer<? super SafeCommandStore>) store -> command.preaccept(store, txn.slice(FULL_RANGES, true), ROUTE, HOME_KEY)).syncUninterruptibly();
Assertions.assertEquals(Status.PreAccepted, command.status());
Assertions.assertEquals(expectedTimestamp, command.executeAt());