| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE |
| * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file |
| * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the |
| * License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
| * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations under the License. |
| */ |
| package org.apache.tuweni.gossip; |
| |
| |
| import org.apache.tuweni.bytes.Bytes; |
| import org.apache.tuweni.concurrent.AsyncCompletion; |
| import org.apache.tuweni.concurrent.CompletableAsyncCompletion; |
| import org.apache.tuweni.crypto.Hash; |
| import org.apache.tuweni.plumtree.vertx.VertxGossipServer; |
| |
| import java.io.IOException; |
| import java.io.PrintStream; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.nio.file.StandardOpenOption; |
| import java.security.Security; |
| import java.time.Instant; |
| import java.util.Collections; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.node.ObjectNode; |
| import io.vertx.core.Vertx; |
| import io.vertx.core.http.HttpMethod; |
| import io.vertx.core.http.HttpServer; |
| import io.vertx.core.http.HttpServerRequest; |
| import org.bouncycastle.jce.provider.BouncyCastleProvider; |
| import picocli.CommandLine; |
| |
| /** |
| * Application running a gossip client, taking configuration from command line or a configuration file. |
| * |
| */ |
| public final class GossipApp { |
| |
| public static void main(String[] args) { |
| Security.addProvider(new BouncyCastleProvider()); |
| GossipCommandLineOptions opts = CommandLine.populateCommand(new GossipCommandLineOptions(), args); |
| try { |
| opts.validate(); |
| } catch (IllegalArgumentException e) { |
| System.err.println("Invalid configuration detected.\n\n" + e.getMessage()); |
| new CommandLine(opts).usage(System.out); |
| System.exit(1); |
| } |
| if (opts.help()) { |
| new CommandLine(opts).usage(System.out); |
| System.exit(0); |
| } |
| GossipApp gossipApp = new GossipApp(Vertx.vertx(), opts, System.err, System.out, () -> System.exit(1)); |
| Runtime.getRuntime().addShutdownHook(new Thread(gossipApp::stop)); |
| gossipApp.start(); |
| } |
| |
| private final ExecutorService senderThreadPool = Executors.newSingleThreadExecutor(new ThreadFactory() { |
| @Override |
| public Thread newThread(Runnable r) { |
| Thread t = new Thread(r, "sender"); |
| t.setDaemon(false); |
| return t; |
| } |
| }); |
| private final GossipCommandLineOptions opts; |
| private final Runnable terminateFunction; |
| private final PrintStream errStream; |
| private final PrintStream outStream; |
| private final VertxGossipServer server; |
| private final HttpServer rpcServer; |
| private final ExecutorService fileWriter = Executors.newSingleThreadExecutor(); |
| |
| GossipApp( |
| Vertx vertx, |
| GossipCommandLineOptions opts, |
| PrintStream errStream, |
| PrintStream outStream, |
| Runnable terminateFunction) { |
| LoggingPeerRepository repository = new LoggingPeerRepository(outStream); |
| outStream.println("Setting up server on " + opts.networkInterface() + ":" + opts.listenPort()); |
| server = new VertxGossipServer( |
| vertx, |
| opts.networkInterface(), |
| opts.listenPort(), |
| Hash::keccak256, |
| repository, |
| (bytes, attr) -> readMessage(opts.messageLog(), errStream, bytes), |
| null, |
| new CountingPeerPruningFunction(10), |
| 100, |
| 100); |
| this.opts = opts; |
| this.errStream = errStream; |
| this.outStream = outStream; |
| this.terminateFunction = terminateFunction; |
| this.rpcServer = vertx.createHttpServer(); |
| } |
| |
| void start() { |
| outStream.println("Starting gossip"); |
| AsyncCompletion completion = server.start(); |
| try { |
| completion.join(); |
| } catch (CompletionException | InterruptedException e) { |
| errStream.println("Server could not start: " + e.getMessage()); |
| terminateFunction.run(); |
| } |
| outStream.println("TCP server started"); |
| |
| CompletableAsyncCompletion rpcCompletion = AsyncCompletion.incomplete(); |
| rpcServer.requestHandler(this::handleRPCRequest).listen(opts.rpcPort(), opts.networkInterface(), res -> { |
| if (res.failed()) { |
| rpcCompletion.completeExceptionally(res.cause()); |
| } else { |
| rpcCompletion.complete(); |
| } |
| }); |
| try { |
| rpcCompletion.join(); |
| } catch (CompletionException | InterruptedException e) { |
| errStream.println("RPC server could not start: " + e.getMessage()); |
| terminateFunction.run(); |
| } |
| outStream.println("RPC server started"); |
| |
| try { |
| AsyncCompletion |
| .allOf(opts.peers().stream().map(peer -> server.connectTo(peer.getHost(), peer.getPort()))) |
| .join(60, TimeUnit.SECONDS); |
| } catch (TimeoutException | InterruptedException e) { |
| errStream.println("Server could not connect to other peers: " + e.getMessage()); |
| } |
| outStream.println("Gossip started"); |
| |
| if (opts.sending()) { |
| outStream.println("Start sending messages"); |
| senderThreadPool.submit(() -> { |
| for (int i = 0; i < opts.numberOfMessages(); i++) { |
| if (Thread.currentThread().isInterrupted()) { |
| return; |
| } |
| Bytes payload = Bytes.random(opts.payloadSize()); |
| publish(payload); |
| try { |
| Thread.sleep(opts.sendInterval()); |
| } catch (InterruptedException e) { |
| return; |
| } |
| } |
| }); |
| } |
| } |
| |
| private void handleRPCRequest(HttpServerRequest httpServerRequest) { |
| if (HttpMethod.POST.equals(httpServerRequest.method())) { |
| if ("/publish".equals(httpServerRequest.path())) { |
| httpServerRequest.bodyHandler(body -> { |
| Bytes message = Bytes.wrapBuffer(body); |
| outStream.println("Message to publish " + message.toHexString()); |
| publish(message); |
| httpServerRequest.response().setStatusCode(200).end(); |
| }); |
| } else { |
| httpServerRequest.response().setStatusCode(404).end(); |
| } |
| } else { |
| httpServerRequest.response().setStatusCode(405).end(); |
| } |
| } |
| |
| void stop() { |
| outStream.println("Stopping sending"); |
| senderThreadPool.shutdown(); |
| outStream.println("Stopping gossip"); |
| try { |
| server.stop().join(); |
| } catch (InterruptedException e) { |
| errStream.println("Server could not stop: " + e.getMessage()); |
| terminateFunction.run(); |
| } |
| |
| CompletableAsyncCompletion rpcCompletion = AsyncCompletion.incomplete(); |
| rpcServer.close(res -> { |
| if (res.failed()) { |
| rpcCompletion.completeExceptionally(res.cause()); |
| } else { |
| rpcCompletion.complete(); |
| } |
| }); |
| try { |
| rpcCompletion.join(); |
| } catch (CompletionException | InterruptedException e) { |
| outStream.println("Stopped gossip"); |
| errStream.println("RPC server could not stop: " + e.getMessage()); |
| terminateFunction.run(); |
| } |
| |
| fileWriter.shutdown(); |
| } |
| |
| private void readMessage(String messageLog, PrintStream err, Bytes bytes) { |
| fileWriter.submit(() -> { |
| ObjectMapper mapper = new ObjectMapper(); |
| ObjectNode node = mapper.createObjectNode(); |
| node.put("timestamp", Instant.now().toString()); |
| node.put("value", bytes.toHexString()); |
| try { |
| Path path = Paths.get(messageLog); |
| Files.write( |
| path, |
| Collections.singletonList(mapper.writeValueAsString(node)), |
| StandardCharsets.UTF_8, |
| Files.exists(path) ? StandardOpenOption.APPEND : StandardOpenOption.CREATE); |
| } catch (IOException e) { |
| err.println(e.getMessage()); |
| } |
| }); |
| } |
| |
| public void publish(Bytes message) { |
| server.gossip("", message); |
| } |
| } |