blob: da5e592804b4b3e9eb40d58bd5cde9311010184e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.tuweni.plumtree;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.crypto.Hash;
import org.apache.tuweni.junit.BouncyCastleExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(BouncyCastleExtension.class)
class StateTest {
private static class PeerImpl implements Peer {
}
private static class MockMessageSender implements MessageSender {
Verb verb;
String attributes;
Peer peer;
Bytes hash;
Bytes payload;
@Override
public void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) {
this.verb = verb;
this.attributes = attributes;
this.peer = peer;
this.hash = hash;
this.payload = payload;
}
}
private static final MessageListener messageListener = new MessageListener() {
public Bytes message;
@Override
public void listen(Bytes messageBody, String attributes) {
message = messageBody;
}
};
@Test
void testInitialState() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
State state = new State(
repo,
Hash::keccak256,
new MockMessageSender(),
messageListener,
(message, peer) -> true,
(peer) -> true);
assertTrue(repo.peers().isEmpty());
assertTrue(repo.lazyPushPeers().isEmpty());
assertTrue(repo.eagerPushPeers().isEmpty());
}
@Test
void firstRoundWithThreePeers() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
State state = new State(
repo,
Hash::keccak256,
new MockMessageSender(),
messageListener,
(message, peer) -> true,
(peer) -> true);
state.addPeer(new PeerImpl());
state.addPeer(new PeerImpl());
state.addPeer(new PeerImpl());
assertTrue(repo.lazyPushPeers().isEmpty());
assertEquals(3, repo.eagerPushPeers().size());
}
@Test
void firstRoundWithTwoPeers() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
State state = new State(
repo,
Hash::keccak256,
new MockMessageSender(),
messageListener,
(message, peer) -> true,
(peer) -> true);
state.addPeer(new PeerImpl());
state.addPeer(new PeerImpl());
assertTrue(repo.lazyPushPeers().isEmpty());
assertEquals(2, repo.eagerPushPeers().size());
}
@Test
void firstRoundWithOnePeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
State state = new State(
repo,
Hash::keccak256,
new MockMessageSender(),
messageListener,
(message, peer) -> true,
(peer) -> true);
state.addPeer(new PeerImpl());
assertTrue(repo.lazyPushPeers().isEmpty());
assertEquals(1, repo.eagerPushPeers().size());
}
@Test
void removePeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
State state = new State(
repo,
Hash::keccak256,
new MockMessageSender(),
messageListener,
(message, peer) -> true,
(peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
state.removePeer(peer);
assertTrue(repo.peers().isEmpty());
assertTrue(repo.lazyPushPeers().isEmpty());
assertTrue(repo.eagerPushPeers().isEmpty());
}
@Test
void prunePeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
State state = new State(
repo,
Hash::keccak256,
new MockMessageSender(),
messageListener,
(message, peer) -> true,
(peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
state.receivePruneMessage(peer);
assertEquals(0, repo.eagerPushPeers().size());
assertEquals(1, repo.lazyPushPeers().size());
}
@Test
void graftPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
State state = new State(
repo,
Hash::keccak256,
new MockMessageSender(),
messageListener,
(message, peer) -> true,
(peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
state.receivePruneMessage(peer);
assertEquals(0, repo.eagerPushPeers().size());
assertEquals(1, repo.lazyPushPeers().size());
state.receiveGraftMessage(peer, Bytes32.random());
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(0, repo.lazyPushPeers().size());
}
@Test
void receiveFullMessageFromEagerPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
State state =
new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer otherPeer = new PeerImpl();
state.addPeer(otherPeer);
Bytes32 msg = Bytes32.random();
String attributes = "{\"message_type\": \"BLOCK\"}";
state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
}
@Test
void receiveFullMessageFromEagerPeerWithALazyPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
State state =
new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer otherPeer = new PeerImpl();
state.addPeer(otherPeer);
Bytes32 msg = Bytes32.random();
Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
String attributes = "{\"message_type\": \"BLOCK\"}";
state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
state.processQueue();
assertEquals(Hash.keccak256(msg), messageSender.hash);
assertEquals(lazyPeer, messageSender.peer);
assertEquals(MessageSender.Verb.IHAVE, messageSender.verb);
assertTrue(state.lazyQueue.isEmpty());
}
@Test
void receiveFullMessageFromEagerPeerThenPartialMessageFromLazyPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
State state =
new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
String attributes = "{\"message_type\": \"BLOCK\"}";
state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
state.receiveIHaveMessage(lazyPeer, message);
assertNull(messageSender.payload);
assertNull(messageSender.peer);
}
@Test
void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
State state = new State(
repo,
Hash::keccak256,
messageSender,
messageListener,
(message, peer) -> true,
(peer) -> true,
100,
4000);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
state.receiveIHaveMessage(lazyPeer, message);
Thread.sleep(200);
assertEquals(message, messageSender.hash);
assertEquals(lazyPeer, messageSender.peer);
assertEquals(MessageSender.Verb.GRAFT, messageSender.verb);
}
@Test
void receivePartialMessageFromLazyPeerAndThenFullMessage() throws Exception {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
State state = new State(
repo,
Hash::keccak256,
messageSender,
messageListener,
(message, peer) -> true,
(peer) -> true,
500,
4000);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
Thread.sleep(100);
String attributes = "{\"message_type\": \"BLOCK\"}";
state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
Thread.sleep(500);
assertNull(messageSender.verb);
assertNull(messageSender.payload);
assertNull(messageSender.peer);
}
@Test
void receiveFullMessageFromUnknownPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
State state =
new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
Bytes message = Bytes32.random();
String attributes = "{\"message_type\": \"BLOCK\"}";
state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(0, repo.lazyPushPeers().size());
assertEquals(peer, repo.eagerPushPeers().iterator().next());
}
@Test
void prunePeerWhenReceivingTwiceTheSameFullMessage() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
State state =
new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
Peer secondPeer = new PeerImpl();
Bytes message = Bytes32.random();
String attributes = "{\"message_type\": \"BLOCK\"}";
state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
state.receiveGossipMessage(secondPeer, attributes, message, Hash.keccak256(message));
assertEquals(2, repo.eagerPushPeers().size());
assertEquals(0, repo.lazyPushPeers().size());
assertNull(messageSender.payload);
assertEquals(secondPeer, messageSender.peer);
assertEquals(MessageSender.Verb.PRUNE, messageSender.verb);
}
}