blob: 89a9dcbd4c65a3f74f32987b2cad5456c4750db9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.tuweni.gossip;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.junit.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith({VertxExtension.class, TempDirectoryExtension.class, BouncyCastleExtension.class})
class GossipIntegrationTest {
@Test
void threeGossipServersStarting(@VertxInstance Vertx vertx, @TempDirectory Path tempDir) throws Exception {
GossipCommandLineOptions opts1 = new GossipCommandLineOptions(
new String[] {"tcp://127.0.0.1:9001", "tcp://127.0.0.1:9002"},
9000,
"127.0.0.1",
tempDir.resolve("log1.log").toString(),
10000,
0,
0,
false,
50,
null);
GossipCommandLineOptions opts2 = new GossipCommandLineOptions(
new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9002"},
9001,
"127.0.0.1",
tempDir.resolve("log2.log").toString(),
10001,
0,
0,
false,
50,
null);
GossipCommandLineOptions opts3 = new GossipCommandLineOptions(
new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9001"},
9002,
"127.0.0.1",
tempDir.resolve("log3.log").toString(),
10002,
0,
0,
false,
50,
null);
AtomicBoolean terminationRan = new AtomicBoolean(false);
ExecutorService service = Executors.newFixedThreadPool(3);
Future<GossipApp> app1Future = service.submit(() -> {
GossipApp app = new GossipApp(vertx, opts1, System.err, System.out, () -> {
terminationRan.set(true);
});
app.start();
return app;
});
Future<GossipApp> app2Future = service.submit(() -> {
GossipApp app = new GossipApp(vertx, opts2, System.err, System.out, () -> {
terminationRan.set(true);
});
app.start();
return app;
});
Future<GossipApp> app3Future = service.submit(() -> {
GossipApp app = new GossipApp(vertx, opts3, System.err, System.out, () -> {
terminationRan.set(true);
});
app.start();
return app;
});
GossipApp app1 = app1Future.get(10, TimeUnit.SECONDS);
GossipApp app2 = app2Future.get(10, TimeUnit.SECONDS);
GossipApp app3 = app3Future.get(10, TimeUnit.SECONDS);
assertFalse(terminationRan.get());
HttpClient client = vertx.createHttpClient();
List<String> sent = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Bytes message = Bytes32.rightPad(Bytes.ofUnsignedInt(i));
sent.add(message.toHexString());
Thread.sleep(100);
client.request(HttpMethod.POST, 10000, "127.0.0.1", "/publish").exceptionHandler(thr -> {
throw new RuntimeException(thr);
}).handler(resp -> {
}).end(Buffer.buffer(message.toArrayUnsafe()));
}
List<String> receiver1 = Collections.emptyList();
List<String> receiver2 = Collections.emptyList();
int counter = 0;
do {
Thread.sleep(100);
counter++;
if (Files.exists(tempDir.resolve("log2.log"))) {
receiver1 = Files.readAllLines(tempDir.resolve("log2.log"));
}
if (Files.exists(tempDir.resolve("log3.log"))) {
receiver2 = Files.readAllLines(tempDir.resolve("log3.log"));
}
} while ((receiver1.size() < 20 || receiver2.size() < 20) && counter < 100);
client.close();
service.submit(app1::stop);
service.submit(app2::stop);
service.submit(app3::stop);
service.shutdown();
List<String> receiver1Expected = new ArrayList<>(sent);
Pattern pattern = Pattern.compile("value\":\"(.*)\"}$");
for (String received : receiver1) {
Matcher match = pattern.matcher(received);
match.find();
String value = match.group(1);
receiver1Expected.remove(value);
}
List<String> receiver2Expected = new ArrayList<>(sent);
for (String received : receiver2) {
Matcher match = pattern.matcher(received);
match.find();
String value = match.group(1);
receiver2Expected.remove(value);
}
assertTrue(receiver1Expected.isEmpty(), "Elements left:" + receiver1Expected);
assertTrue(receiver2Expected.isEmpty(), "Elements left:" + receiver2Expected);
}
}