| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE |
| * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file |
| * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the |
| * License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
| * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations under the License. |
| */ |
| package org.apache.tuweni.plumtree; |
| |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| import org.apache.tuweni.bytes.Bytes; |
| import org.apache.tuweni.bytes.Bytes32; |
| import org.apache.tuweni.crypto.Hash; |
| import org.apache.tuweni.junit.BouncyCastleExtension; |
| |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.extension.ExtendWith; |
| |
| @ExtendWith(BouncyCastleExtension.class) |
| class StateTest { |
| |
| private static class PeerImpl implements Peer { |
| |
| } |
| |
| private static class MockMessageSender implements MessageSender { |
| |
| Verb verb; |
| String attributes; |
| Peer peer; |
| Bytes hash; |
| Bytes payload; |
| |
| @Override |
| public void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) { |
| this.verb = verb; |
| this.attributes = attributes; |
| this.peer = peer; |
| this.hash = hash; |
| this.payload = payload; |
| } |
| } |
| |
| private static final MessageListener messageListener = new MessageListener() { |
| |
| public Bytes message; |
| |
| @Override |
| public void listen(Bytes messageBody, String attributes) { |
| message = messageBody; |
| } |
| }; |
| |
| @Test |
| void testInitialState() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| State state = new State( |
| repo, |
| Hash::keccak256, |
| new MockMessageSender(), |
| messageListener, |
| (message, peer) -> true, |
| (peer) -> true); |
| assertTrue(repo.peers().isEmpty()); |
| assertTrue(repo.lazyPushPeers().isEmpty()); |
| assertTrue(repo.eagerPushPeers().isEmpty()); |
| } |
| |
| @Test |
| void firstRoundWithThreePeers() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| State state = new State( |
| repo, |
| Hash::keccak256, |
| new MockMessageSender(), |
| messageListener, |
| (message, peer) -> true, |
| (peer) -> true); |
| state.addPeer(new PeerImpl()); |
| state.addPeer(new PeerImpl()); |
| state.addPeer(new PeerImpl()); |
| assertTrue(repo.lazyPushPeers().isEmpty()); |
| assertEquals(3, repo.eagerPushPeers().size()); |
| } |
| |
| @Test |
| void firstRoundWithTwoPeers() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| State state = new State( |
| repo, |
| Hash::keccak256, |
| new MockMessageSender(), |
| messageListener, |
| (message, peer) -> true, |
| (peer) -> true); |
| state.addPeer(new PeerImpl()); |
| state.addPeer(new PeerImpl()); |
| assertTrue(repo.lazyPushPeers().isEmpty()); |
| assertEquals(2, repo.eagerPushPeers().size()); |
| } |
| |
| @Test |
| void firstRoundWithOnePeer() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| State state = new State( |
| repo, |
| Hash::keccak256, |
| new MockMessageSender(), |
| messageListener, |
| (message, peer) -> true, |
| (peer) -> true); |
| state.addPeer(new PeerImpl()); |
| assertTrue(repo.lazyPushPeers().isEmpty()); |
| assertEquals(1, repo.eagerPushPeers().size()); |
| } |
| |
| @Test |
| void removePeer() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| State state = new State( |
| repo, |
| Hash::keccak256, |
| new MockMessageSender(), |
| messageListener, |
| (message, peer) -> true, |
| (peer) -> true); |
| Peer peer = new PeerImpl(); |
| state.addPeer(peer); |
| state.removePeer(peer); |
| assertTrue(repo.peers().isEmpty()); |
| assertTrue(repo.lazyPushPeers().isEmpty()); |
| assertTrue(repo.eagerPushPeers().isEmpty()); |
| } |
| |
| @Test |
| void prunePeer() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| State state = new State( |
| repo, |
| Hash::keccak256, |
| new MockMessageSender(), |
| messageListener, |
| (message, peer) -> true, |
| (peer) -> true); |
| Peer peer = new PeerImpl(); |
| state.addPeer(peer); |
| state.receivePruneMessage(peer); |
| assertEquals(0, repo.eagerPushPeers().size()); |
| assertEquals(1, repo.lazyPushPeers().size()); |
| } |
| |
| @Test |
| void graftPeer() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| State state = new State( |
| repo, |
| Hash::keccak256, |
| new MockMessageSender(), |
| messageListener, |
| (message, peer) -> true, |
| (peer) -> true); |
| Peer peer = new PeerImpl(); |
| state.addPeer(peer); |
| state.receivePruneMessage(peer); |
| assertEquals(0, repo.eagerPushPeers().size()); |
| assertEquals(1, repo.lazyPushPeers().size()); |
| state.receiveGraftMessage(peer, Bytes32.random()); |
| assertEquals(1, repo.eagerPushPeers().size()); |
| assertEquals(0, repo.lazyPushPeers().size()); |
| } |
| |
| @Test |
| void receiveFullMessageFromEagerPeer() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| MockMessageSender messageSender = new MockMessageSender(); |
| State state = |
| new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); |
| Peer peer = new PeerImpl(); |
| state.addPeer(peer); |
| Peer otherPeer = new PeerImpl(); |
| state.addPeer(otherPeer); |
| Bytes32 msg = Bytes32.random(); |
| String attributes = "{\"message_type\": \"BLOCK\"}"; |
| state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg)); |
| assertEquals(msg, messageSender.payload); |
| assertEquals(otherPeer, messageSender.peer); |
| } |
| |
| @Test |
| void receiveFullMessageFromEagerPeerWithALazyPeer() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| MockMessageSender messageSender = new MockMessageSender(); |
| State state = |
| new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); |
| Peer peer = new PeerImpl(); |
| state.addPeer(peer); |
| Peer otherPeer = new PeerImpl(); |
| state.addPeer(otherPeer); |
| Bytes32 msg = Bytes32.random(); |
| Peer lazyPeer = new PeerImpl(); |
| state.addPeer(lazyPeer); |
| repo.moveToLazy(lazyPeer); |
| String attributes = "{\"message_type\": \"BLOCK\"}"; |
| state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg)); |
| assertEquals(msg, messageSender.payload); |
| assertEquals(otherPeer, messageSender.peer); |
| state.processQueue(); |
| assertEquals(Hash.keccak256(msg), messageSender.hash); |
| assertEquals(lazyPeer, messageSender.peer); |
| assertEquals(MessageSender.Verb.IHAVE, messageSender.verb); |
| assertTrue(state.lazyQueue.isEmpty()); |
| } |
| |
| @Test |
| void receiveFullMessageFromEagerPeerThenPartialMessageFromLazyPeer() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| MockMessageSender messageSender = new MockMessageSender(); |
| State state = |
| new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); |
| Peer peer = new PeerImpl(); |
| state.addPeer(peer); |
| Peer lazyPeer = new PeerImpl(); |
| state.addPeer(lazyPeer); |
| repo.moveToLazy(lazyPeer); |
| Bytes message = Bytes32.random(); |
| String attributes = "{\"message_type\": \"BLOCK\"}"; |
| state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); |
| state.receiveIHaveMessage(lazyPeer, message); |
| assertNull(messageSender.payload); |
| assertNull(messageSender.peer); |
| } |
| |
| @Test |
| void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| MockMessageSender messageSender = new MockMessageSender(); |
| State state = new State( |
| repo, |
| Hash::keccak256, |
| messageSender, |
| messageListener, |
| (message, peer) -> true, |
| (peer) -> true, |
| 100, |
| 4000); |
| Peer peer = new PeerImpl(); |
| state.addPeer(peer); |
| Peer lazyPeer = new PeerImpl(); |
| state.addPeer(lazyPeer); |
| repo.moveToLazy(lazyPeer); |
| Bytes message = Bytes32.random(); |
| state.receiveIHaveMessage(lazyPeer, message); |
| Thread.sleep(200); |
| assertEquals(message, messageSender.hash); |
| assertEquals(lazyPeer, messageSender.peer); |
| assertEquals(MessageSender.Verb.GRAFT, messageSender.verb); |
| } |
| |
| @Test |
| void receivePartialMessageFromLazyPeerAndThenFullMessage() throws Exception { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| MockMessageSender messageSender = new MockMessageSender(); |
| State state = new State( |
| repo, |
| Hash::keccak256, |
| messageSender, |
| messageListener, |
| (message, peer) -> true, |
| (peer) -> true, |
| 500, |
| 4000); |
| Peer peer = new PeerImpl(); |
| state.addPeer(peer); |
| Peer lazyPeer = new PeerImpl(); |
| state.addPeer(lazyPeer); |
| repo.moveToLazy(lazyPeer); |
| Bytes message = Bytes32.random(); |
| state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message)); |
| Thread.sleep(100); |
| String attributes = "{\"message_type\": \"BLOCK\"}"; |
| state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); |
| Thread.sleep(500); |
| assertNull(messageSender.verb); |
| assertNull(messageSender.payload); |
| assertNull(messageSender.peer); |
| } |
| |
| @Test |
| void receiveFullMessageFromUnknownPeer() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| MockMessageSender messageSender = new MockMessageSender(); |
| State state = |
| new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); |
| Peer peer = new PeerImpl(); |
| Bytes message = Bytes32.random(); |
| String attributes = "{\"message_type\": \"BLOCK\"}"; |
| state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); |
| assertEquals(1, repo.eagerPushPeers().size()); |
| assertEquals(0, repo.lazyPushPeers().size()); |
| assertEquals(peer, repo.eagerPushPeers().iterator().next()); |
| } |
| |
| @Test |
| void prunePeerWhenReceivingTwiceTheSameFullMessage() { |
| EphemeralPeerRepository repo = new EphemeralPeerRepository(); |
| MockMessageSender messageSender = new MockMessageSender(); |
| State state = |
| new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); |
| Peer peer = new PeerImpl(); |
| Peer secondPeer = new PeerImpl(); |
| Bytes message = Bytes32.random(); |
| String attributes = "{\"message_type\": \"BLOCK\"}"; |
| state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); |
| state.receiveGossipMessage(secondPeer, attributes, message, Hash.keccak256(message)); |
| assertEquals(2, repo.eagerPushPeers().size()); |
| assertEquals(0, repo.lazyPushPeers().size()); |
| assertNull(messageSender.payload); |
| assertEquals(secondPeer, messageSender.peer); |
| assertEquals(MessageSender.Verb.PRUNE, messageSender.verb); |
| } |
| } |