blob: 8c64e40f927c44b56e659e12473e3e0c7d442183 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.proxy;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.google.gson.reflect.TypeToken;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
import org.apache.pulsar.websocket.stats.ProxyTopicStat;
import org.apache.pulsar.websocket.stats.ProxyTopicStat.ConsumerStats;
import org.apache.pulsar.websocket.stats.ProxyTopicStat.ProducerStats;
import org.awaitility.Awaitility;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.logging.LoggingFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "websocket")
public class ProxyPublishConsumeTest extends ProducerConsumerBase {
protected String methodName;
private ProxyServer proxyServer;
private WebSocketService service;
private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
@BeforeMethod
public void setup() throws Exception {
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
super.internalSetup();
super.producerBaseSetup();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(Optional.of(0));
config.setClusterName("test");
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
}
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
if (service != null) {
service.close();
}
if (proxyServer != null) {
proxyServer.stop();
}
log.info("Finished Cleaning Up Test setup");
}
@Test(timeOut = 10000)
public void socketTest() throws Exception {
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/consumer/persistent/my-property/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
String readerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/reader/persistent/my-property/my-ns/my-topic1";
String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
URI consumeUri = URI.create(consumerUri);
URI readUri = URI.create(readerUri);
URI produceUri = URI.create(producerUri);
WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
WebSocketClient consumeClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
WebSocketClient readClient = new WebSocketClient();
SimpleConsumerSocket readSocket = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
try {
consumeClient1.start();
consumeClient2.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
log.info("Connecting to : {}", consumeUri);
readClient.start();
ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
Future<Session> readerFuture = readClient.connect(readSocket, readUri, readRequest);
log.info("Connecting to : {}", readUri);
// let it connect
assertTrue(consumerFuture1.get().isOpen());
assertTrue(consumerFuture2.get().isOpen());
assertTrue(readerFuture.get().isOpen());
// Also make sure subscriptions and reader are already created
Thread.sleep(500);
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
assertTrue(producerFuture.get().isOpen());
int retry = 0;
int maxRetry = 400;
Awaitility.await().untilAsserted(() -> {
assertTrue(consumeSocket1.getReceivedMessagesCount() >= 10
|| consumeSocket2.getReceivedMessagesCount() >= 10);
assertTrue(readSocket.getReceivedMessagesCount() >= 10);
});
// if the subscription type is exclusive (default), either of the consumer sessions has already been closed
assertTrue(consumerFuture1.get().isOpen());
assertTrue(consumerFuture2.get().isOpen());
assertTrue(produceSocket.getBuffer().size() > 0);
if (consumeSocket1.getBuffer().size() > consumeSocket2.getBuffer().size()) {
assertEquals(produceSocket.getBuffer(), consumeSocket1.getBuffer());
} else {
assertEquals(produceSocket.getBuffer(), consumeSocket2.getBuffer());
}
assertEquals(produceSocket.getBuffer(), readSocket.getBuffer());
} finally {
stopWebSocketClient(consumeClient1, consumeClient2, readClient, produceClient);
}
}
@Test(timeOut = 10000)
public void socketTestEndOfTopic() throws Exception {
final String topic = "my-property/my-ns/my-topic8";
final String subscription = "my-sub";
final String consumerUri = String.format(
"ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared",
proxyServer.getListenPortHTTP().get(), topic, subscription
);
final String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", proxyServer.getListenPortHTTP().get(), topic);
URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);
WebSocketClient consumeClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);
// let it connect
assertTrue(consumerFuture.get().isOpen());
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
assertTrue(producerFuture.get().isOpen());
// Send 30 message in total.
produceSocket.sendMessage(20);
// Send 10 permits, should receive 10 message
consumeSocket.sendPermits(10);
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), 10));
consumeSocket.isEndOfTopic();
// Wait till get response
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket.getBuffer().size(), 11));
// Assert not reach end of topic yet
assertEquals(consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), "{\"endOfTopic\":false}");
// Send 20 more permits, should receive all message
consumeSocket.sendPermits(20);
// 31 includes previous of end of topic request.
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), 31));
consumeSocket.isEndOfTopic();
// Wait till get response
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), 32));
// Assert not reached end of topic.
assertEquals(consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), "{\"endOfTopic\":false}");
admin.topics().terminateTopicAsync(topic).get();
consumeSocket.isEndOfTopic();
// Wait till get response
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), 33));
// Assert reached end of topic.
assertEquals(consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), "{\"endOfTopic\":true}");
} finally {
stopWebSocketClient(consumeClient, produceClient);
}
}
@Test
public void unsubscribeTest() throws Exception {
final String namespace = "my-property/my-ns";
final String topic = namespace + "/" + "my-topic7";
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + topic);
admin.topics().createPartitionedTopic(topicName, 3);
final String subscription = "my-sub";
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
URI consumeUri = URI.create(consumerUri);
WebSocketClient consumeClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
Thread.sleep(500);
try {
// setup a consumer
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
consumerFuture.get();
List<String> subs = admin.topics().getSubscriptions(topic);
assertEquals(subs.size(), 1);
assertEquals(subs.get(0), subscription);
// do unsubscribe
consumeSocket.unsubscribe();
//wait for delete
Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().getSubscriptions(topic).size(), 0);
});
} finally {
stopWebSocketClient(consumeClient);
}
}
@Test(timeOut = 10000)
public void emptySubscriptionConsumerTest() {
// Empty subscription name
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/consumer/persistent/my-property/my-ns/my-topic2/?subscriptionType=Exclusive";
URI consumeUri = URI.create(consumerUri);
WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
try {
consumeClient1.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
consumerFuture1.get();
fail("should fail: empty subscription");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
HttpServletResponse.SC_BAD_REQUEST);
} finally {
stopWebSocketClient(consumeClient1);
}
}
@Test(timeOut = 10000)
public void conflictingConsumerTest() throws Exception {
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/consumer/persistent/my-property/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
URI consumeUri = URI.create(consumerUri);
WebSocketClient consumeClient1 = new WebSocketClient();
WebSocketClient consumeClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
try {
consumeClient1.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
consumerFuture1.get();
try {
consumeClient2.start();
ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
consumerFuture2.get();
fail("should fail: conflicting subscription name");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
HttpServletResponse.SC_CONFLICT);
} finally {
stopWebSocketClient(consumeClient2);
}
} finally {
stopWebSocketClient(consumeClient1);
}
}
@Test(timeOut = 10000)
public void conflictingProducerTest() throws Exception {
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/producer/persistent/my-property/my-ns/my-topic4?producerName=my-producer";
URI produceUri = URI.create(producerUri);
WebSocketClient produceClient1 = new WebSocketClient();
WebSocketClient produceClient2 = new WebSocketClient();
SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
try {
produceClient1.start();
ClientUpgradeRequest produceRequest1 = new ClientUpgradeRequest();
Future<Session> producerFuture1 = produceClient1.connect(produceSocket1, produceUri, produceRequest1);
producerFuture1.get();
try {
produceClient2.start();
ClientUpgradeRequest produceRequest2 = new ClientUpgradeRequest();
Future<Session> producerFuture2 = produceClient2.connect(produceSocket2, produceUri, produceRequest2);
producerFuture2.get();
fail("should fail: conflicting producer name");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
HttpServletResponse.SC_CONFLICT);
} finally {
stopWebSocketClient(produceClient2);
}
} finally {
stopWebSocketClient(produceClient1);
}
}
@Test// (timeOut = 30000)
public void producerBacklogQuotaExceededTest() throws Exception {
String namespace = "my-property/ns-ws-quota";
admin.namespaces().createNamespace(namespace);
admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"));
admin.namespaces().setBacklogQuota(namespace,
BacklogQuota.builder()
.limitSize(10)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold)
.build());
final String topic = namespace + "/my-topic5";
final String subscription = "my-sub";
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + topic;
URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);
WebSocketClient consumeClient = new WebSocketClient();
WebSocketClient produceClient1 = new WebSocketClient();
WebSocketClient produceClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
// Create subscription
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
consumerFuture.get();
} finally {
stopWebSocketClient(consumeClient);
}
// Fill the backlog
try {
produceClient1.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient1.connect(produceSocket1, produceUri, produceRequest);
producerFuture.get();
produceSocket1.sendMessage(100);
} finally {
stopWebSocketClient(produceClient1);
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
// New producer fails to connect
try {
produceClient2.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient2.connect(produceSocket2, produceUri, produceRequest);
producerFuture.get();
fail("should fail: backlog quota exceeded");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} finally {
stopWebSocketClient(produceClient2);
admin.topics().skipAllMessages("persistent://" + topic, subscription);
admin.topics().delete("persistent://" + topic);
admin.namespaces().removeBacklogQuota(namespace);
admin.namespaces().deleteNamespace(namespace);
}
}
@Test(timeOut = 10000)
public void topicDoesNotExistTest() throws Exception {
final String namespace = "my-property/ns-topic-creation-not-allowed";
admin.namespaces().createNamespace(namespace);
admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"));
admin.namespaces().setAutoTopicCreation(namespace,
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(false)
.topicType(TopicType.NON_PARTITIONED.toString())
.build());
final String topic = namespace + "/my-topic";
final String subscription = "my-sub";
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/producer/persistent/" + topic;
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
URI produceUri = URI.create(producerUri);
URI consumeUri = URI.create(consumerUri);
WebSocketClient produceClient = new WebSocketClient();
WebSocketClient consumeClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
try {
produceClient.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
producerFuture.get();
fail("should fail: topic does not exist");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_NOT_FOUND);
} finally {
stopWebSocketClient(produceClient);
}
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
consumerFuture.get();
fail("should fail: topic does not exist");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_NOT_FOUND);
} finally {
stopWebSocketClient(consumeClient);
}
admin.namespaces().deleteNamespace(namespace);
}
@Test(timeOut = 10000)
public void producerFencedTest() throws Exception {
final String topic = "my-property/my-ns/producer-fenced-test";
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + topic)
.accessMode(ProducerAccessMode.Exclusive).create();
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/producer/persistent/" + topic;
URI produceUri = URI.create(producerUri);
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
try {
produceClient.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
producerFuture.get();
fail("should fail: producer fenced");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_CONFLICT);
} finally {
stopWebSocketClient(produceClient);
producer.close();
}
}
@Test(timeOut = 10000)
public void topicTerminatedTest() throws Exception {
final String topic = "my-property/my-ns/topic-terminated-test";
admin.topics().createNonPartitionedTopic("persistent://" + topic);
admin.topics().terminateTopic("persistent://" + topic);
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/producer/persistent/" + topic;
URI produceUri = URI.create(producerUri);
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
try {
produceClient.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
producerFuture.get();
fail("should fail: topic terminated");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} finally {
stopWebSocketClient(produceClient);
admin.topics().delete("persistent://" + topic);
}
}
/**
* It verifies proxy topic-stats and proxy-metrics api
*
* @throws Exception
*/
@Test(timeOut = 10000)
public void testProxyStats() throws Exception {
final String topic = "my-property/my-ns/my-topic6";
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + topic
+ "/my-sub?subscriptionType=Failover";
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + topic + "/";
final String readerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/reader/persistent/" + topic;
System.out.println(consumerUri + ", " + producerUri);
URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);
URI readUri = URI.create(readerUri);
WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
WebSocketClient readClient = new WebSocketClient();
SimpleConsumerSocket readSocket = new SimpleConsumerSocket();
try {
consumeClient1.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
log.info("Connecting to : {}", consumeUri);
readClient.start();
ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
Future<Session> readerFuture = readClient.connect(readSocket, readUri, readRequest);
log.info("Connecting to : {}", readUri);
assertTrue(consumerFuture1.get().isOpen());
assertTrue(readerFuture.get().isOpen());
Thread.sleep(500);
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
assertTrue(producerFuture.get().isOpen());
// sleep so, proxy can deliver few messages to consumers for stats
Awaitility.await().untilAsserted(() -> {
assertTrue(consumeSocket1.getReceivedMessagesCount() >= 2);
});
@Cleanup
Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
final String baseUrl = pulsar.getSafeWebServiceAddress()
.replace(Integer.toString(pulsar.getConfiguration().getWebServicePort().get()),
(Integer.toString(proxyServer.getListenPortHTTP().get())))
+ "/admin/v2/proxy-stats/";
// verify proxy metrics
verifyProxyMetrics(client, baseUrl);
// verify proxy stats
verifyProxyStats(client, baseUrl, topic);
// verify topic stat
verifyTopicStat(client, baseUrl + "persistent/", topic);
} finally {
stopWebSocketClient(consumeClient1, produceClient, readClient);
}
}
@Test(timeOut = 10000)
public void consumeMessagesInPartitionedTopicTest() throws Exception {
final String namespace = "my-property/my-ns";
final String topic = namespace + "/" + "my-topic7";
admin.topics().createPartitionedTopic("persistent://" + topic, 3);
final String subscription = "my-sub";
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + topic;
URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);
WebSocketClient consumeClient = new WebSocketClient();
WebSocketClient produceClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
try {
produceClient.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
producerFuture.get();
produceSocket.sendMessage(100);
} finally {
stopWebSocketClient(produceClient);
}
Thread.sleep(500);
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
consumerFuture.get();
} finally {
stopWebSocketClient(consumeClient);
}
}
@Test(timeOut = 10000)
public void socketPullModeTest() throws Exception {
final String topic = "my-property/my-ns/my-topic8";
final String subscription = "my-sub";
final String consumerUri = String.format(
"ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared",
proxyServer.getListenPortHTTP().get(), topic, subscription
);
final String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", proxyServer.getListenPortHTTP().get(), topic);
URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);
WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
WebSocketClient consumeClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
try {
consumeClient1.start();
consumeClient2.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
log.info("Connecting to : {}", consumeUri);
// let it connect
assertTrue(consumerFuture1.get().isOpen());
assertTrue(consumerFuture2.get().isOpen());
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
assertTrue(producerFuture.get().isOpen());
produceSocket.sendMessage(100);
Thread.sleep(500);
// Verify no messages received despite production
assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);
consumeSocket1.sendPermits(3);
consumeSocket2.sendPermits(2);
consumeSocket2.sendPermits(2);
consumeSocket2.sendPermits(2);
Awaitility.await().untilAsserted(() -> {
assertEquals(consumeSocket1.getReceivedMessagesCount(), 3);
assertEquals(consumeSocket2.getReceivedMessagesCount(), 6);
});
} finally {
stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
}
}
@Test(timeOut = 20000)
public void nackMessageTest() throws Exception {
final String subscription = "my-sub";
final String dlqTopic = "my-property/my-ns/nack-msg-dlq-" + UUID.randomUUID();
final String consumerTopic = "my-property/my-ns/nack-msg-" + UUID.randomUUID();
final String dlqUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/consumer/persistent/" +
dlqTopic + "/" + subscription +
"?subscriptionType=Shared";
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/consumer/persistent/" +
consumerTopic + "/" + subscription +
"?deadLetterTopic=" + dlqTopic +
"&maxRedeliverCount=1&subscriptionType=Shared&negativeAckRedeliveryDelay=1000";
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/producer/persistent/" + consumerTopic;
WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
WebSocketClient consumeClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);
consumeSocket1.setMessageHandler((id, data) -> {
JsonObject nack = new JsonObject();
nack.add("messageId", new JsonPrimitive(id));
nack.add("type", new JsonPrimitive("negativeAcknowledge"));
return nack.toString();
});
try {
consumeClient1.start();
consumeClient2.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, URI.create(consumerUri), consumeRequest1);
Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, URI.create(dlqUri), consumeRequest2);
assertTrue(consumerFuture1.get().isOpen());
assertTrue(consumerFuture2.get().isOpen());
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, URI.create(producerUri), produceRequest);
assertTrue(producerFuture.get().isOpen());
assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);
produceSocket.sendMessage(1);
// Main topic
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(consumeSocket1.getReceivedMessagesCount(), 2));
// DLQ
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(consumeSocket2.getReceivedMessagesCount(), 1));
} finally {
stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
}
}
@Test(timeOut = 20000)
public void nackRedeliveryDelayTest() throws Exception {
final String uriBase = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2";
final String topic = "my-property/my-ns/nack-redelivery-delay-" + UUID.randomUUID();
final String sub = "my-sub";
final int delayTime = 5000;
final String consumerUri = String.format("%s/consumer/persistent/%s/%s?negativeAckRedeliveryDelay=%d", uriBase,
topic, sub, delayTime);
final String producerUri = String.format("%s/producer/persistent/%s", uriBase, topic);
final WebSocketClient consumeClient = new WebSocketClient();
final SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
final WebSocketClient produceClient = new WebSocketClient();
final SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);
consumeSocket.setMessageHandler((mid, data) -> {
JsonObject nack = new JsonObject();
nack.add("type", new JsonPrimitive("negativeAcknowledge"));
nack.add("messageId", new JsonPrimitive(mid));
return nack.toString();
});
try {
consumeClient.start();
final ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
final Future<Session> consumerFuture = consumeClient.connect(consumeSocket, URI.create(consumerUri),
consumeRequest);
assertTrue(consumerFuture.get().isOpen());
produceClient.start();
final ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
final Future<Session> producerFuture = produceClient.connect(produceSocket, URI.create(producerUri),
produceRequest);
assertTrue(producerFuture.get().isOpen());
assertEquals(consumeSocket.getReceivedMessagesCount(), 0);
produceSocket.sendMessage(1);
Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 1));
// Nacked message should be redelivered after 5 seconds
Thread.sleep(delayTime);
Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 2));
} finally {
stopWebSocketClient(consumeClient, produceClient);
}
}
@Test(timeOut = 20000)
public void ackBatchMessageTest() throws Exception {
final String subscription = "my-sub";
final String topic = "my-property/my-ns/ack-batch-message" + UUID.randomUUID();
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/consumer/persistent/" + topic + "/" + subscription;
final int messages = 10;
WebSocketClient consumerClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.create();
try {
consumerClient.start();
ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumerClient.connect(consumeSocket, URI.create(consumerUri), consumerRequest);
assertTrue(consumerFuture.get().isOpen());
assertEquals(consumeSocket.getReceivedMessagesCount(), 0);
for (int i = 0; i < messages; i++) {
producer.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
producer.flush();
consumeSocket.sendPermits(messages);
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
// The message should not be acked since we only acked 1 message of the batch message
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topic).getSubscriptions()
.get(subscription).getMsgBacklog(), 0));
} finally {
stopWebSocketClient(consumerClient);
}
}
@Test(timeOut = 20000)
public void consumeEncryptedMessages() throws Exception {
final String subscription = "my-sub";
final String topic = "my-property/my-ns/encrypted" + UUID.randomUUID();
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/consumer/persistent/" + topic + "/" + subscription + "?cryptoFailureAction=CONSUME";
final int messages = 10;
WebSocketClient consumerClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.defaultCryptoKeyReader(rsaPublicKeyData)
.addEncryptionKey("ws-consumer-a")
.create();
try {
consumerClient.start();
ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumerClient.connect(consumeSocket, URI.create(consumerUri), consumerRequest);
assertTrue(consumerFuture.get().isOpen());
assertEquals(consumeSocket.getReceivedMessagesCount(), 0);
for (int i = 0; i < messages; i++) {
producer.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
producer.flush();
consumeSocket.sendPermits(messages);
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
for (JsonObject msg : consumeSocket.getMessages()) {
assertTrue(msg.has("encryptionContext"));
JsonObject encryptionCtx = msg.getAsJsonObject("encryptionContext");
JsonObject keys = encryptionCtx.getAsJsonObject("keys");
assertTrue(keys.has("ws-consumer-a"));
assertTrue(keys.getAsJsonObject("ws-consumer-a").has("keyValue"));
}
// The message should not be acked since we only acked 1 message of the batch message
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topic).getSubscriptions()
.get(subscription).getMsgBacklog(), 0));
} finally {
stopWebSocketClient(consumerClient);
}
}
private void verifyTopicStat(Client client, String baseUrl, String topic) {
String statUrl = baseUrl + topic + "/stats";
WebTarget webTarget = client.target(statUrl);
Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
Response response = (Response) invocationBuilder.get();
String responseStr = response.readEntity(String.class);
final Gson gson = new Gson();
final ProxyTopicStat data = gson.fromJson(responseStr, ProxyTopicStat.class);
assertFalse(data.producerStats.isEmpty());
assertFalse(data.consumerStats.isEmpty());
}
private void verifyProxyMetrics(Client client, String baseUrl) {
// generate metrics
service.getProxyStats().generate();
// collect metrics
String statUrl = baseUrl + "metrics";
WebTarget webTarget = client.target(statUrl);
Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
Response response = invocationBuilder.get();
String responseStr = response.readEntity(String.class);
final Gson gson = new Gson();
List<Metrics> data = gson.fromJson(responseStr, new TypeToken<List<Metrics>>() {
}.getType());
assertFalse(data.isEmpty());
// re-generate metrics
service.getProxyStats().generate();
invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
response = invocationBuilder.get();
responseStr = response.readEntity(String.class);
data = gson.fromJson(responseStr, new TypeToken<List<Metrics>>() {
}.getType());
assertFalse(data.isEmpty());
}
private void verifyProxyStats(Client client, String baseUrl, String topic) {
String statUrl = baseUrl + "stats";
WebTarget webTarget = client.target(statUrl);
Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
Response response = invocationBuilder.get();
String responseStr = response.readEntity(String.class);
final Gson gson = new Gson();
final Map<String, ProxyTopicStat> data = gson.fromJson(responseStr,
new TypeToken<Map<String, ProxyTopicStat>>() {
}.getType());
// number of topic is loaded = 1
assertEquals(data.size(), 1);
Entry<String, ProxyTopicStat> entry = data.entrySet().iterator().next();
assertEquals(entry.getKey(), "persistent://" + topic);
ProxyTopicStat stats = entry.getValue();
// number of consumers are connected = 2 (one is reader)
assertEquals(stats.consumerStats.size(), 2);
ConsumerStats consumerStats = stats.consumerStats.iterator().next();
assertTrue(consumerStats.numberOfMsgDelivered > 0);
assertNotNull(consumerStats.remoteConnection);
// number of producers are connected = 1
assertEquals(stats.producerStats.size(), 1);
ProducerStats producerStats = stats.producerStats.iterator().next();
assertTrue(producerStats.numberOfMsgPublished > 0);
assertNotNull(producerStats.remoteConnection);
}
private void stopWebSocketClient(WebSocketClient... clients) {
for (WebSocketClient client : clients) {
try {
client.stop();
} catch (Exception e) {
log.error(e.getMessage());
}
}
log.info("proxy clients are stopped successfully");
}
private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
}