blob: cdba24b1fb75466268eefa3b4a9b1403939b579b [file] [log] [blame]
package accord.messages;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.MessageSink;
import accord.api.Scheduler;
import accord.impl.mock.MockCluster.Clock;
import accord.txn.Dependencies;
import accord.txn.Txn;
import accord.txn.TxnId;
import accord.utils.ThreadPoolScheduler;
import accord.local.*;
import accord.txn.Keys;
import accord.impl.IntKey;
import accord.impl.mock.Network;
import accord.impl.mock.RecordingMessageSink;
import accord.impl.TestAgent;
import accord.impl.mock.MockStore;
import accord.topology.Shards;
import accord.impl.TopologyFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Random;
import static accord.Utils.id;
import static accord.Utils.writeTxn;
public class PreAcceptTest
{
private static final Id ID1 = id(1);
private static final Id ID2 = id(2);
private static final Id ID3 = id(3);
private static final List<Id> IDS = List.of(ID1, ID2, ID3);
private static final Shards TOPOLOGY = TopologyFactory.toShards(IDS, 3, IntKey.range(0, 100));
private static Node createNode(Id nodeId, MessageSink messageSink, Clock clock)
{
Random random = new Random();
MockStore store = new MockStore();
Scheduler scheduler = new ThreadPoolScheduler();
return new Node(nodeId, TOPOLOGY, messageSink, random, clock, () -> store,
new TestAgent(), scheduler, CommandStore.Factory.SINGLE_THREAD);
}
@Test
void initialCommandTest()
{
RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
Clock clock = new Clock(100);
Node node = createNode(ID1, messageSink, clock);
try
{
IntKey key = IntKey.key(10);
CommandStore commandStore = node.local(key).orElseThrow();
Assertions.assertFalse(commandStore.hasCommandsForKey(key));
TxnId txnId = clock.idForNode(ID2);
Txn txn = writeTxn(Keys.of(key));
PreAccept preAccept = new PreAccept(txnId, txn);
clock.increment(10);
preAccept.process(node, ID2, 0);
Command command = commandStore.commandsForKey(key).uncommitted.get(txnId);
Assertions.assertEquals(Status.PreAccepted, command.status());
messageSink.assertHistorySizes(0, 1);
Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, new Dependencies()),
messageSink.responses.get(0).payload);
}
finally
{
node.shutdown();
}
}
@Test
void nackTest()
{
RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
Clock clock = new Clock(100);
Node node = createNode(ID1, messageSink, clock);
try
{
IntKey key = IntKey.key(10);
CommandStore commandStore = node.local(key).orElseThrow();
Assertions.assertFalse(commandStore.hasCommandsForKey(key));
TxnId txnId = clock.idForNode(ID2);
Txn txn = writeTxn(Keys.of(key));
PreAccept preAccept = new PreAccept(txnId, txn);
preAccept.process(node, ID2, 0);
}
finally
{
node.shutdown();
}
}
@Test
void singleKeyTimestampUpdate()
{
}
@Test
void multiKeyTimestampUpdate()
{
RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
Clock clock = new Clock(100);
Node node = createNode(ID1, messageSink, clock);
try
{
IntKey key1 = IntKey.key(10);
PreAccept preAccept1 = new PreAccept(clock.idForNode(ID2), writeTxn(Keys.of(key1)));
preAccept1.process(node, ID2, 0);
messageSink.clearHistory();
IntKey key2 = IntKey.key(11);
TxnId txnId2 = new TxnId(50, 0, ID3);
PreAccept preAccept2 = new PreAccept(txnId2, writeTxn(Keys.of(key1, key2)));
clock.increment(10);
preAccept2.process(node, ID3, 0);
messageSink.assertHistorySizes(0, 1);
Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
Dependencies expectedDeps = new Dependencies();
Assertions.assertEquals(new PreAccept.PreAcceptOk(new TxnId(110, 0, ID1), expectedDeps),
messageSink.responses.get(0).payload);
}
finally
{
node.shutdown();
}
}
}