blob: a66edde914656e345b6f09d1ab089de7e1a50623 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.tuweni.scuttlebutt.lib;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import org.apache.tuweni.concurrent.AsyncResult;
import org.apache.tuweni.concurrent.CompletableAsyncResult;
import org.apache.tuweni.scuttlebutt.lib.model.FeedMessage;
import org.apache.tuweni.scuttlebutt.lib.model.StreamHandler;
import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
public class FeedStreamTest {
/**
* Tests it is possible to make posts and retrieve them again using the FeedService class
*/
@Test
@Disabled("Requires a running ssb server")
public void testCreateFeedStream() throws IOException, InterruptedException, ConnectionClosedException {
TestConfig config = TestConfig.fromEnvironment();
AsyncResult<ScuttlebuttClient> scuttlebuttClientLibAsyncResult =
ScuttlebuttClientFactory.fromNet(new ObjectMapper(), config.getHost(), config.getPort(), config.getKeyPair());
ScuttlebuttClient scuttlebuttClient = scuttlebuttClientLibAsyncResult.get();
FeedService feedService = scuttlebuttClient.getFeedService();
AsyncResult<List<FeedMessage>> published = publishTestMessages(feedService);
// Wait for the messages to be published.
List<FeedMessage> feedMessages = published.get();
assertEquals(feedMessages.size(), 10);
Optional<FeedMessage> lastPosted =
feedMessages.stream().max(Comparator.comparingLong(msg -> msg.getValue().getSequence()));
assertTrue(lastPosted.isPresent());
FeedMessage lastFeedMessagePosted = lastPosted.get();
CompletableAsyncResult<Optional<FeedMessage>> lastMessage = AsyncResult.incomplete();
feedService.createFeedStream((closer) -> new StreamHandler<FeedMessage>() {
Optional<FeedMessage> currentMessage = Optional.empty();
@Override
public void onMessage(FeedMessage item) {
currentMessage = Optional.of(item);
}
@Override
public void onStreamEnd() {
lastMessage.complete(currentMessage);
}
@Override
public void onStreamError(Exception ex) {
System.out.println(ex.getMessage());
fail(ex.getMessage());
}
});
Optional<FeedMessage> lastStreamedMessage = lastMessage.get();
assertTrue(lastStreamedMessage.isPresent());
assertEquals(lastStreamedMessage.get().getValue().getSequence(), lastPosted.get().getValue().getSequence());
TestScuttlebuttSerializationModel content =
lastStreamedMessage.get().getValue().getContentAs(new ObjectMapper(), TestScuttlebuttSerializationModel.class);
assertEquals("serialization-test", content.getType());
}
private AsyncResult<List<FeedMessage>> publishTestMessages(FeedService feedService) throws JsonProcessingException {
List<AsyncResult<FeedMessage>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
AsyncResult<FeedMessage> result = feedService.publish(new TestScuttlebuttSerializationModel("test: " + i));
results.add(result);
}
return AsyncResult.combine(results);
}
}