blob: 1b570258249b9939609dd4e5c24d67d8fc5a6345 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
import accord.api.TestableConfigurationService;
import accord.impl.InMemoryCommandStores;
import accord.impl.IntKey;
import accord.impl.TestAgent;
import accord.impl.TopologyFactory;
import accord.impl.*;
import accord.impl.mock.MockCluster;
import accord.impl.mock.MockConfigurationService;
import accord.impl.mock.MockStore;
import accord.local.Node.Id;
import accord.local.Status.Known;
import accord.primitives.*;
import accord.topology.Topology;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static accord.Utils.id;
import static accord.Utils.writeTxn;
import static accord.impl.InMemoryCommandStore.inMemory;
public class CommandTest
{
private static final Node.Id ID1 = id(1);
private static final Node.Id ID2 = id(2);
private static final Node.Id ID3 = id(3);
private static final List<Node.Id> IDS = Arrays.asList(ID1, ID2, ID3);
private static final Range FULL_RANGE = IntKey.range(0, 100);
private static final Ranges FULL_RANGES = Ranges.single(FULL_RANGE);
private static final Topology TOPOLOGY = TopologyFactory.toTopology(IDS, 3, FULL_RANGE);
private static final IntKey.Raw KEY = IntKey.key(10);
private static final RoutingKey HOME_KEY = KEY.toUnseekable();
private static final FullKeyRoute ROUTE = RoutingKeys.of(HOME_KEY).toRoute(HOME_KEY);
private static class CommandStoreSupport
{
final AtomicReference<Topology> local = new AtomicReference<>(TOPOLOGY);
final MockStore data = new MockStore();
}
private static void setTopologyEpoch(AtomicReference<Topology> topology, long epoch)
{
topology.set(topology.get().withEpoch(epoch));
}
private static CommandStore createStore(CommandStoreSupport storeSupport)
{
return createNode(ID1, storeSupport).unsafeByIndex(0);
}
private static class NoOpProgressLog implements ProgressLog
{
@Override
public void unwitnessed(TxnId txnId, RoutingKey homeKey, ProgressShard shard)
{
}
@Override
public void preaccepted(Command command, ProgressShard shard)
{
}
@Override
public void accepted(Command command, ProgressShard shard)
{
}
@Override
public void committed(Command command, ProgressShard shard)
{
}
@Override
public void readyToExecute(Command command, ProgressShard shard)
{
}
@Override
public void executed(Command command, ProgressShard shard)
{
}
@Override
public void invalidated(Command command, ProgressShard shard)
{
}
@Override
public void durableLocal(TxnId txnId)
{
}
@Override
public void durable(Command command, @Nullable Set<Id> persistedOn)
{
}
@Override
public void durable(TxnId txnId, @Nullable Unseekables<?, ?> unseekables, ProgressShard shard)
{
}
@Override
public void waiting(TxnId blockedBy, Known blockedUntil, Unseekables<?, ?> blockedOn)
{
}
}
private static Node createNode(Id id, CommandStoreSupport storeSupport)
{
return new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
new MockCluster.Clock(100), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new Random(), null,
SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
}
@Test
void noConflictWitnessTest()
{
CommandStoreSupport support = new CommandStoreSupport();
CommandStore commands = createStore(support);
MockCluster.Clock clock = new MockCluster.Clock(100);
TxnId txnId = clock.idForNode(1, 1);
Txn txn = writeTxn(Keys.of(KEY));
Command command = new InMemoryCommand(commands, txnId);
Assertions.assertEquals(Status.NotWitnessed, command.status());
Assertions.assertNull(command.executeAt());
command.preaccept(inMemory(commands), txn.slice(FULL_RANGES, true), ROUTE, HOME_KEY);
Assertions.assertEquals(Status.PreAccepted, command.status());
Assertions.assertEquals(txnId, command.executeAt());
}
@Test
void supersedingEpochWitnessTest()
{
CommandStoreSupport support = new CommandStoreSupport();
Node node = createNode(ID1, support);
CommandStore commands = node.unsafeByIndex(0);
TxnId txnId = node.nextTxnId();
((MockCluster.Clock)node.unsafeGetNowSupplier()).increment(10);
Txn txn = writeTxn(Keys.of(KEY));
Command command = new InMemoryCommand(commands, txnId);
Assertions.assertEquals(Status.NotWitnessed, command.status());
Assertions.assertNull(command.executeAt());
setTopologyEpoch(support.local, 2);
((TestableConfigurationService)node.configService()).reportTopology(support.local.get().withEpoch(2));
Timestamp expectedTimestamp = new Timestamp(2, 110, 0, ID1);
commands.execute(null, (Consumer<? super SafeCommandStore>) store -> command.preaccept(store, txn.slice(FULL_RANGES, true), ROUTE, HOME_KEY)).syncUninterruptibly();
Assertions.assertEquals(Status.PreAccepted, command.status());
Assertions.assertEquals(expectedTimestamp, command.executeAt());
}
}