blob: 85d90a90ff546d006ff74b7a0c500a1dde97c594 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.tuweni.scuttlebutt.rpc.mux;
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.concurrent.AsyncResult;
import org.apache.tuweni.concurrent.CompletableAsyncResult;
import org.apache.tuweni.crypto.sodium.Signature;
import org.apache.tuweni.io.Base64;
import org.apache.tuweni.junit.VertxExtension;
import org.apache.tuweni.junit.VertxInstance;
import org.apache.tuweni.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient;
import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
import org.apache.tuweni.scuttlebutt.rpc.RPCFunction;
import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
import java.io.BufferedWriter;
import java.io.File;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import io.vertx.core.Vertx;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.logl.Level;
import org.logl.LoggerProvider;
import org.logl.logl.SimpleLogger;
import org.logl.vertx.LoglLogDelegateFactory;
@ExtendWith(VertxExtension.class)
public class PatchworkIntegrationTest {
LoggerProvider loggerProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8))));
@Test
@Disabled
public void testWithPatchwork(@VertxInstance Vertx vertx) throws Exception {
RPCHandler rpcHandler = makeRPCHandler(vertx);
List<AsyncResult<RPCResponse>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RPCFunction function = new RPCFunction("whoami");
RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>());
AsyncResult<RPCResponse> res = rpcHandler.makeAsyncRequest(asyncRequest);
results.add(res);
}
AsyncResult<List<RPCResponse>> allResults = AsyncResult.combine(results);
List<RPCResponse> rpcMessages = allResults.get();
assertEquals(10, rpcMessages.size());
}
// TODO: Move this to a utility class that all the scuttlebutt modules' tests can use.
private Signature.KeyPair getLocalKeys() throws Exception {
Optional<String> ssbDir = Optional.fromNullable(System.getenv().get("ssb_dir"));
Optional<String> homePath =
Optional.fromNullable(System.getProperty("user.home")).transform(home -> home + "/.ssb");
Optional<String> path = ssbDir.or(homePath);
if (!path.isPresent()) {
throw new Exception("Cannot find ssb directory config value");
}
String secretPath = path.get() + "/secret";
File file = new File(secretPath);
if (!file.exists()) {
throw new Exception("Secret file does not exist");
}
Scanner s = new Scanner(file, UTF_8.name());
s.useDelimiter("\n");
ArrayList<String> list = new ArrayList<String>();
while (s.hasNext()) {
String next = s.next();
// Filter out the comment lines
if (!next.startsWith("#")) {
list.add(next);
}
}
String secretJSON = String.join("", list);
ObjectMapper mapper = new ObjectMapper();
HashMap<String, String> values = mapper.readValue(secretJSON, new TypeReference<Map<String, String>>() {});
String pubKey = values.get("public").replace(".ed25519", "");
String privateKey = values.get("private").replace(".ed25519", "");
Bytes pubKeyBytes = Base64.decode(pubKey);
Bytes privKeyBytes = Base64.decode(privateKey);
Signature.PublicKey pub = Signature.PublicKey.fromBytes(pubKeyBytes);
Signature.SecretKey secretKey = Signature.SecretKey.fromBytes(privKeyBytes);
return new Signature.KeyPair(pub, secretKey);
}
@Test
@Disabled
public void postMessageTest(@VertxInstance Vertx vertx) throws Exception {
RPCHandler rpcHandler = makeRPCHandler(vertx);
List<AsyncResult<RPCResponse>> results = new ArrayList<>();
for (int i = 0; i < 20; i++) {
// Note: in a real use case, this would more likely be a Java class with these fields
HashMap<String, String> params = new HashMap<>();
params.put("type", "post");
params.put("text", "test test " + i);
RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params));
AsyncResult<RPCResponse> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);
results.add(rpcMessageAsyncResult);
}
List<RPCResponse> rpcMessages = AsyncResult.combine(results).get();
rpcMessages.forEach(msg -> System.out.println(msg.asString()));
}
@Test
@Disabled
/**
* We expect this to complete the AsyncResult with an exception.
*/
public void postMessageThatIsTooLong(@VertxInstance Vertx vertx) throws Exception {
RPCHandler rpcHandler = makeRPCHandler(vertx);
List<AsyncResult<RPCResponse>> results = new ArrayList<>();
String longString = new String(new char[40000]).replace("\0", "a");
for (int i = 0; i < 20; i++) {
// Note: in a real use case, this would more likely be a Java class with these fields
HashMap<String, String> params = new HashMap<>();
params.put("type", "post");
params.put("text", longString);
RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params));
AsyncResult<RPCResponse> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);
results.add(rpcMessageAsyncResult);
}
List<RPCResponse> rpcMessages = AsyncResult.combine(results).get();
rpcMessages.forEach(msg -> System.out.println(msg.asString()));
}
private RPCHandler makeRPCHandler(Vertx vertx) throws Exception {
Signature.KeyPair keyPair = getLocalKeys();
String networkKeyBase64 = "1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=";
Bytes32 networkKeyBytes32 = Bytes32.wrap(Base64.decode(networkKeyBase64));
String host = "localhost";
int port = 8008;
LoggerProvider loggerProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8))));
LoglLogDelegateFactory.setProvider(loggerProvider);
SecureScuttlebuttVertxClient secureScuttlebuttVertxClient =
new SecureScuttlebuttVertxClient(loggerProvider, vertx, keyPair, networkKeyBytes32);
AsyncResult<RPCHandler> onConnect =
secureScuttlebuttVertxClient.connectTo(port, host, keyPair.publicKey(), (sender, terminationFn) -> {
return new RPCHandler(vertx, sender, terminationFn, new ObjectMapper(), loggerProvider);
});
return onConnect.get();
}
@Test
@Disabled
public void streamTest(@VertxInstance Vertx vertx) throws Exception {
RPCHandler handler = makeRPCHandler(vertx);
Signature.PublicKey publicKey = getLocalKeys().publicKey();
String pubKey = "@" + Base64.encode(publicKey.bytes()) + ".ed25519";
Map<String, String> params = new HashMap<>();
params.put("id", pubKey);
CompletableAsyncResult<Void> streamEnded = AsyncResult.incomplete();
RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createUserStream"), Arrays.asList(params));
handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
@Override
public void onMessage(RPCResponse message) {
System.out.print(message.asString());
}
@Override
public void onStreamEnd() {
streamEnded.complete(null);
}
@Override
public void onStreamError(Exception ex) {
streamEnded.completeExceptionally(ex);
}
});
// Wait until the stream is complete
streamEnded.get();
}
}