| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.pulsar.reactive.client.jackson; |
| |
| import java.util.Map; |
| |
| import com.fasterxml.jackson.annotation.JsonInclude; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import org.apache.pulsar.client.api.CompressionType; |
| import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; |
| import org.apache.pulsar.client.api.CryptoKeyReader; |
| import org.apache.pulsar.client.api.DeadLetterPolicy; |
| import org.apache.pulsar.client.api.EncryptionKeyInfo; |
| import org.apache.pulsar.client.api.HashingScheme; |
| import org.apache.pulsar.client.api.KeySharedPolicy; |
| import org.apache.pulsar.client.api.MessageRouter; |
| import org.apache.pulsar.client.api.MessageRoutingMode; |
| import org.apache.pulsar.client.api.ProducerAccessMode; |
| import org.apache.pulsar.client.api.ProducerCryptoFailureAction; |
| import org.apache.pulsar.client.api.Range; |
| import org.apache.pulsar.client.api.RegexSubscriptionMode; |
| import org.apache.pulsar.client.api.SubscriptionInitialPosition; |
| import org.apache.pulsar.client.api.SubscriptionMode; |
| import org.apache.pulsar.client.api.SubscriptionType; |
| import org.apache.pulsar.client.impl.KeyBasedBatcherBuilder; |
| import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec; |
| import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageReaderSpec; |
| import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec; |
| import org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec; |
| import org.apache.pulsar.reactive.client.api.MutableReactiveMessageReaderSpec; |
| import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec; |
| import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec; |
| import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec; |
| import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.ValueSource; |
| import reactor.core.Disposable; |
| import reactor.core.scheduler.Scheduler; |
| |
| import static org.assertj.core.api.Assertions.assertThat; |
| |
| class PulsarReactiveClientModuleTest { |
| |
| private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new PulsarReactiveClientModule()) |
| .setSerializationInclusion(JsonInclude.Include.NON_NULL); |
| |
| @ParameterizedTest |
| @ValueSource(classes = { ReactiveMessageConsumerSpec.class, ImmutableReactiveMessageConsumerSpec.class, |
| MutableReactiveMessageConsumerSpec.class }) |
| void shouldSerDeserReactiveMessageConsumerSpec(Class<? extends ReactiveMessageConsumerSpec> klass) |
| throws Exception { |
| // @formatter:off |
| String content = ("{" |
| + "'topicNames': ['my-topic']," |
| + "'topicsPattern': 'my-topic-*'," |
| + "'topicsPatternSubscriptionMode': 'PersistentOnly'," |
| + "'topicsPatternAutoDiscoveryPeriod': 30," |
| + "'subscriptionName': 'my-sub'," |
| + "'subscriptionMode': 'Durable'," |
| + "'subscriptionType': 'Exclusive'," |
| + "'subscriptionInitialPosition': 'Latest'," |
| + "'keySharedPolicy': 'STICKY'," |
| + "'replicateSubscriptionState': true," |
| + "'subscriptionProperties': {'my-key': 'my-value'}," |
| + "'consumerName': 'my-consumer'," |
| + "'properties': {'my-key': 'my-value'}," |
| + "'priorityLevel': 42," |
| + "'readCompacted': true," |
| + "'batchIndexAckEnabled': true," |
| + "'ackTimeout': 30," |
| + "'ackTimeoutTickTime': 30," |
| + "'acknowledgementsGroupTime': 30," |
| + "'acknowledgeAsynchronously': true," |
| + "'acknowledgeScheduler': 'boundedElastic'," |
| + "'negativeAckRedeliveryDelay': 30," |
| + "'deadLetterPolicy': {" |
| + " 'maxRedeliverCount': 1," |
| + " 'retryLetterTopic': 'my-retry-topic'," |
| + " 'deadLetterTopic': 'my-dlq'," |
| + " 'initialSubscriptionName': 'my-dlq-sub'" |
| + "}," |
| + "'retryLetterTopicEnable': true," |
| + "'receiverQueueSize': 42," |
| + "'maxTotalReceiverQueueSizeAcrossPartitions': 42," |
| + "'autoUpdatePartitions': true," |
| + "'autoUpdatePartitionsInterval': 30," |
| + "'cryptoKeyReader': {" |
| + " 'className': 'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'," |
| + " 'args': {'dummy': 'my-dummy'}" |
| + "}," |
| + "'cryptoFailureAction': 'FAIL'," |
| + "'maxPendingChunkedMessage': 42," |
| + "'autoAckOldestChunkedMessageOnQueueFull': true," |
| + "'expireTimeOfIncompleteChunkedMessage': 30" |
| + "}").replaceAll("'", "\""); |
| // @formatter:on |
| |
| ReactiveMessageConsumerSpec spec = MAPPER.readValue(content, klass); |
| |
| assertThat(spec.getTopicNames()).containsExactly("my-topic"); |
| assertThat(spec.getTopicsPattern().pattern()).isEqualTo("my-topic-*"); |
| assertThat(spec.getTopicsPatternSubscriptionMode()).isEqualTo(RegexSubscriptionMode.PersistentOnly); |
| assertThat(spec.getTopicsPatternAutoDiscoveryPeriod()).hasMillis(30_000); |
| assertThat(spec.getSubscriptionName()).isEqualTo("my-sub"); |
| assertThat(spec.getSubscriptionMode()).isEqualTo(SubscriptionMode.Durable); |
| assertThat(spec.getSubscriptionType()).isEqualTo(SubscriptionType.Exclusive); |
| assertThat(spec.getSubscriptionInitialPosition()).isEqualTo(SubscriptionInitialPosition.Latest); |
| assertThat(spec.getKeySharedPolicy()).isInstanceOf(KeySharedPolicy.KeySharedPolicySticky.class); |
| assertThat(spec.getReplicateSubscriptionState()).isTrue(); |
| assertThat(spec.getSubscriptionProperties()).containsOnlyKeys("my-key"); |
| assertThat(spec.getSubscriptionProperties()).containsEntry("my-key", "my-value"); |
| assertThat(spec.getConsumerName()).isEqualTo("my-consumer"); |
| assertThat(spec.getProperties()).containsOnlyKeys("my-key"); |
| assertThat(spec.getProperties()).containsEntry("my-key", "my-value"); |
| assertThat(spec.getPriorityLevel()).isEqualTo(42); |
| assertThat(spec.getReadCompacted()).isTrue(); |
| assertThat(spec.getBatchIndexAckEnabled()).isTrue(); |
| assertThat(spec.getAckTimeout()).hasMillis(30_000); |
| assertThat(spec.getAckTimeoutTickTime()).hasMillis(30_000); |
| assertThat(spec.getAcknowledgementsGroupTime()).hasMillis(30_000); |
| assertThat(spec.getAcknowledgeAsynchronously()).isTrue(); |
| assertThat(spec.getAcknowledgeScheduler().toString()).isEqualTo("Schedulers.boundedElastic()"); |
| assertThat(spec.getNegativeAckRedeliveryDelay()).hasMillis(30_000); |
| assertThat(spec.getDeadLetterPolicy().getMaxRedeliverCount()).isEqualTo(1); |
| assertThat(spec.getDeadLetterPolicy().getDeadLetterTopic()).isEqualTo("my-dlq"); |
| assertThat(spec.getDeadLetterPolicy().getRetryLetterTopic()).isEqualTo("my-retry-topic"); |
| assertThat(spec.getDeadLetterPolicy().getInitialSubscriptionName()).isEqualTo("my-dlq-sub"); |
| assertThat(spec.getRetryLetterTopicEnable()).isTrue(); |
| assertThat(spec.getReceiverQueueSize()).isEqualTo(42); |
| assertThat(spec.getMaxTotalReceiverQueueSizeAcrossPartitions()).isEqualTo(42); |
| assertThat(spec.getAutoUpdatePartitions()).isTrue(); |
| assertThat(spec.getAutoUpdatePartitionsInterval()).hasMillis(30_000); |
| assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class); |
| Map<String, Object> params = ((TestCryptoKeyReader) spec.getCryptoKeyReader()).params; |
| assertThat(params).containsOnlyKeys("dummy"); |
| assertThat(params).containsEntry("dummy", "my-dummy"); |
| assertThat(spec.getCryptoFailureAction()).isEqualTo(ConsumerCryptoFailureAction.FAIL); |
| assertThat(spec.getMaxPendingChunkedMessage()).isEqualTo(42); |
| assertThat(spec.getAutoAckOldestChunkedMessageOnQueueFull()).isTrue(); |
| assertThat(spec.getExpireTimeOfIncompleteChunkedMessage()).hasMillis(30_000); |
| |
| String json = MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(spec); |
| |
| // @formatter:off |
| String expected = ("{\n" |
| + " 'topicNames' : [ 'my-topic' ],\n" |
| + " 'topicsPattern' : 'my-topic-*',\n" |
| + " 'topicsPatternSubscriptionMode' : 'PersistentOnly',\n" |
| + " 'topicsPatternAutoDiscoveryPeriod' : 30.000000000,\n" |
| + " 'subscriptionName' : 'my-sub',\n" |
| + " 'subscriptionMode' : 'Durable',\n" |
| + " 'subscriptionType' : 'Exclusive',\n" |
| + " 'subscriptionInitialPosition' : 'Latest',\n" |
| + " 'keySharedPolicy' : 'STICKY',\n" |
| + " 'replicateSubscriptionState' : true,\n" |
| + " 'subscriptionProperties' : {\n" |
| + " 'my-key' : 'my-value'\n" |
| + " },\n" |
| + " 'consumerName' : 'my-consumer',\n" |
| + " 'properties' : {\n" |
| + " 'my-key' : 'my-value'\n" |
| + " },\n" |
| + " 'priorityLevel' : 42,\n" |
| + " 'readCompacted' : true,\n" |
| + " 'batchIndexAckEnabled' : true,\n" |
| + " 'ackTimeout' : 30.000000000,\n" |
| + " 'ackTimeoutTickTime' : 30.000000000,\n" |
| + " 'acknowledgementsGroupTime' : 30.000000000,\n" |
| + " 'acknowledgeAsynchronously' : true,\n" |
| + " 'acknowledgeScheduler' : 'boundedElastic',\n" |
| + " 'negativeAckRedeliveryDelay' : 30.000000000,\n" |
| + " 'deadLetterPolicy' : {\n" |
| + " 'maxRedeliverCount' : 1,\n" |
| + " 'retryLetterTopic' : 'my-retry-topic',\n" |
| + " 'deadLetterTopic' : 'my-dlq',\n" |
| + " 'initialSubscriptionName' : 'my-dlq-sub'\n" |
| + " },\n" |
| + " 'retryLetterTopicEnable' : true,\n" |
| + " 'receiverQueueSize' : 42,\n" |
| + " 'maxTotalReceiverQueueSizeAcrossPartitions' : 42,\n" |
| + " 'autoUpdatePartitions' : true,\n" |
| + " 'autoUpdatePartitionsInterval' : 30.000000000,\n" |
| + " 'cryptoKeyReader' : {\n" |
| + " 'className' : 'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'\n" |
| + " },\n" |
| + " 'cryptoFailureAction' : 'FAIL',\n" |
| + " 'maxPendingChunkedMessage' : 42,\n" |
| + " 'autoAckOldestChunkedMessageOnQueueFull' : true,\n" |
| + " 'expireTimeOfIncompleteChunkedMessage' : 30.000000000\n" |
| + "}").replaceAll("'", "\""); |
| // @formatter:on |
| |
| assertThat(json).isEqualTo(expected); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(classes = { |
| // MutableReactiveMessageConsumerSpec.class, |
| ReactiveMessageConsumerSpec.class, ImmutableReactiveMessageConsumerSpec.class }) |
| void shouldSerDeserEmptyReactiveMessageConsumerSpec(Class<? extends ReactiveMessageConsumerSpec> klass) |
| throws Exception { |
| String content = "{}"; |
| ReactiveMessageConsumerSpec spec = MAPPER.readValue(content, klass); |
| String json = MAPPER.writeValueAsString(spec); |
| assertThat(json).isEqualTo(content); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(classes = { ReactiveMessageReaderSpec.class, ImmutableReactiveMessageReaderSpec.class, |
| MutableReactiveMessageReaderSpec.class }) |
| void shouldSerDeserReactiveMessageReaderSpec(Class<? extends ReactiveMessageReaderSpec> klass) throws Exception { |
| // @formatter:off |
| String content = ("{" |
| + "'topicNames': ['my-topic']," |
| + "'readerName': 'my-reader'," |
| + "'subscriptionName': 'my-sub'," |
| + "'generatedSubscriptionNamePrefix': 'my-prefix-'," |
| + "'receiverQueueSize': 42," |
| + "'readCompacted': true," |
| + "'keyHashRanges': [{" |
| + " 'start': 42," |
| + " 'end': 43" |
| + "}]," |
| + "'cryptoKeyReader': {" |
| + " 'className': 'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'," |
| + " 'args': {'dummy': 'my-dummy'}" |
| + "}," |
| + "'cryptoFailureAction': 'FAIL'" |
| + "}").replaceAll("'", "\""); |
| // @formatter:on |
| |
| ReactiveMessageReaderSpec spec = MAPPER.readValue(content, klass); |
| |
| assertThat(spec.getTopicNames()).containsExactly("my-topic"); |
| assertThat(spec.getReaderName()).isEqualTo("my-reader"); |
| assertThat(spec.getSubscriptionName()).isEqualTo("my-sub"); |
| assertThat(spec.getGeneratedSubscriptionNamePrefix()).isEqualTo("my-prefix-"); |
| assertThat(spec.getReceiverQueueSize()).isEqualTo(42); |
| assertThat(spec.getReadCompacted()).isTrue(); |
| assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class); |
| assertThat(spec.getKeyHashRanges()).containsExactly(new Range(42, 43)); |
| assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class); |
| Map<String, Object> params = ((TestCryptoKeyReader) spec.getCryptoKeyReader()).params; |
| assertThat(params).containsOnlyKeys("dummy"); |
| assertThat(params).containsEntry("dummy", "my-dummy"); |
| assertThat(spec.getCryptoFailureAction()).isEqualTo(ConsumerCryptoFailureAction.FAIL); |
| |
| String json = MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(spec); |
| |
| // @formatter:off |
| String expected = ("{\n" |
| + " 'topicNames' : [ 'my-topic' ],\n" |
| + " 'readerName' : 'my-reader',\n" |
| + " 'subscriptionName' : 'my-sub',\n" |
| + " 'generatedSubscriptionNamePrefix' : 'my-prefix-',\n" |
| + " 'receiverQueueSize' : 42,\n" |
| + " 'readCompacted' : true,\n" |
| + " 'keyHashRanges' : [ {\n" |
| + " 'start' : 42,\n" |
| + " 'end' : 43\n" |
| + " } ],\n" |
| + " 'cryptoKeyReader' : {\n" |
| + " 'className' : 'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'\n" |
| + " },\n" |
| + " 'cryptoFailureAction' : 'FAIL'\n" |
| + "}").replaceAll("'", "\""); |
| // @formatter:on |
| |
| assertThat(json).isEqualTo(expected); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(classes = { MutableReactiveMessageSenderSpec.class, ReactiveMessageSenderSpec.class, |
| ImmutableReactiveMessageSenderSpec.class }) |
| void shouldSerDeserEmptyReactiveMessageSenderSpec(Class<? extends ReactiveMessageSenderSpec> klass) |
| throws Exception { |
| String content = "{}"; |
| ReactiveMessageSenderSpec spec = MAPPER.readValue(content, klass); |
| String json = MAPPER.writeValueAsString(spec); |
| assertThat(json).isEqualTo(content); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(classes = { |
| // MutableReactiveMessageReaderSpec.class, |
| ReactiveMessageReaderSpec.class, ImmutableReactiveMessageReaderSpec.class }) |
| void shouldSerDeserEmptyReactiveMessageReaderSpec(Class<? extends ReactiveMessageReaderSpec> klass) |
| throws Exception { |
| String content = "{}"; |
| ReactiveMessageReaderSpec spec = MAPPER.readValue(content, klass); |
| String json = MAPPER.writeValueAsString(spec); |
| assertThat(json).isEqualTo(content); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(classes = { ReactiveMessageSenderSpec.class, ImmutableReactiveMessageSenderSpec.class, |
| MutableReactiveMessageSenderSpec.class }) |
| void shouldSerDeserReactiveMessageSenderSpec(Class<? extends ReactiveMessageSenderSpec> klass) throws Exception { |
| // @formatter:off |
| String content = ("{" |
| + "'topicName': 'my-topic'," |
| + "'producerName': 'my-producer'," |
| + "'sendTimeout': 30," |
| + "'maxPendingMessages': 42," |
| + "'maxPendingMessagesAcrossPartitions': 42," |
| + "'messageRoutingMode': 'SinglePartition'," |
| + "'hashingScheme': 'JavaStringHash'," |
| + "'cryptoFailureAction': 'FAIL'," |
| + "'messageRouter': {" |
| + " 'className': 'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestMessageRouter'," |
| + " 'args': {'dummy': 'my-dummy'}" |
| + "}," |
| + "'batchingMaxPublishDelay': 30," |
| + "'roundRobinRouterBatchingPartitionSwitchFrequency': 42," |
| + "'batchingMaxMessages': 42," |
| + "'batchingMaxBytes': 42," |
| + "'batchingEnabled': true," |
| + "'batcherBuilder': {" |
| + " 'className': 'org.apache.pulsar.client.impl.KeyBasedBatcherBuilder'" |
| + "}," |
| + "'chunkingEnabled': true," |
| + "'cryptoKeyReader': {" |
| + " 'className': 'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'," |
| + " 'args': {'dummy': 'my-dummy'}" |
| + "}," |
| + "'encryptionKeys': ['my-encryption-key']," |
| + "'compressionType': 'LZ4'," |
| + "'initialSequenceId': 42," |
| + "'autoUpdatePartitions': true," |
| + "'autoUpdatePartitionsInterval': 30," |
| + "'multiSchema': true," |
| + "'accessMode': 'Shared'," |
| + "'lazyStartPartitionedProducers': true," |
| + "'properties' : {" |
| + " 'my-key' : 'my-value'" |
| + "}" |
| + "}").replaceAll("'", "\""); |
| // @formatter:on |
| |
| ReactiveMessageSenderSpec spec = MAPPER.readValue(content, klass); |
| |
| assertThat(spec.getTopicName()).isEqualTo("my-topic"); |
| assertThat(spec.getProducerName()).isEqualTo("my-producer"); |
| assertThat(spec.getSendTimeout()).hasMillis(30_000); |
| assertThat(spec.getMaxPendingMessages()).isEqualTo(42); |
| assertThat(spec.getMaxPendingMessagesAcrossPartitions()).isEqualTo(42); |
| assertThat(spec.getMessageRoutingMode()).isEqualTo(MessageRoutingMode.SinglePartition); |
| assertThat(spec.getHashingScheme()).isEqualTo(HashingScheme.JavaStringHash); |
| assertThat(spec.getCryptoFailureAction()).isEqualTo(ProducerCryptoFailureAction.FAIL); |
| assertThat(spec.getMessageRouter()).isInstanceOf(TestMessageRouter.class); |
| Map<String, Object> params = ((TestMessageRouter) spec.getMessageRouter()).params; |
| assertThat(params).containsOnlyKeys("dummy"); |
| assertThat(params).containsEntry("dummy", "my-dummy"); |
| assertThat(spec.getBatchingMaxPublishDelay()).hasMillis(30_000); |
| assertThat(spec.getRoundRobinRouterBatchingPartitionSwitchFrequency()).isEqualTo(42); |
| assertThat(spec.getBatchingMaxMessages()).isEqualTo(42); |
| assertThat(spec.getBatchingMaxBytes()).isEqualTo(42); |
| assertThat(spec.getBatchingEnabled()).isTrue(); |
| assertThat(spec.getBatcherBuilder()).isInstanceOf(KeyBasedBatcherBuilder.class); |
| assertThat(spec.getChunkingEnabled()).isTrue(); |
| assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class); |
| params = ((TestCryptoKeyReader) spec.getCryptoKeyReader()).params; |
| assertThat(params).containsOnlyKeys("dummy"); |
| assertThat(params).containsEntry("dummy", "my-dummy"); |
| assertThat(spec.getEncryptionKeys()).containsExactly("my-encryption-key"); |
| assertThat(spec.getCompressionType()).isEqualTo(CompressionType.LZ4); |
| assertThat(spec.getInitialSequenceId()).isEqualTo(42); |
| assertThat(spec.getAutoUpdatePartitions()).isTrue(); |
| assertThat(spec.getAutoUpdatePartitionsInterval()).hasMillis(30_000); |
| assertThat(spec.getMultiSchema()).isTrue(); |
| assertThat(spec.getAccessMode()).isEqualTo(ProducerAccessMode.Shared); |
| assertThat(spec.getLazyStartPartitionedProducers()).isTrue(); |
| assertThat(spec.getProperties()).containsEntry("my-key", "my-value"); |
| |
| String json = MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(spec); |
| |
| // @formatter:off |
| String expected = ("{\n" |
| + " 'topicName' : 'my-topic',\n" |
| + " 'producerName' : 'my-producer',\n" |
| + " 'sendTimeout' : 30.000000000,\n" |
| + " 'maxPendingMessages' : 42,\n" |
| + " 'maxPendingMessagesAcrossPartitions' : 42,\n" |
| + " 'messageRoutingMode' : 'SinglePartition',\n" |
| + " 'hashingScheme' : 'JavaStringHash',\n" |
| + " 'cryptoFailureAction' : 'FAIL',\n" |
| + " 'messageRouter' : {\n" |
| + " 'className' : 'org.apache.pulsar.reactive.client.jackson" |
| + ".PulsarReactiveClientModuleTest$TestMessageRouter'\n" |
| + " },\n" |
| + " 'batchingMaxPublishDelay' : 30.000000000,\n" |
| + " 'roundRobinRouterBatchingPartitionSwitchFrequency' : 42,\n" |
| + " 'batchingMaxMessages' : 42,\n" |
| + " 'batchingMaxBytes' : 42,\n" |
| + " 'batchingEnabled' : true,\n" |
| + " 'batcherBuilder' : { },\n" |
| + " 'chunkingEnabled' : true,\n" |
| + " 'cryptoKeyReader' : {\n" |
| + " 'className' : 'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'\n" |
| + " },\n" |
| + " 'encryptionKeys' : [ 'my-encryption-key' ],\n" |
| + " 'compressionType' : 'LZ4',\n" |
| + " 'initialSequenceId' : 42,\n" |
| + " 'autoUpdatePartitions' : true,\n" |
| + " 'autoUpdatePartitionsInterval' : 30.000000000,\n" |
| + " 'multiSchema' : true,\n" |
| + " 'accessMode' : 'Shared',\n" |
| + " 'lazyStartPartitionedProducers' : true,\n" |
| + " 'properties' : {\n" |
| + " 'my-key' : 'my-value'\n" |
| + " }\n" |
| + "}").replaceAll("'", "\""); |
| // @formatter:on |
| |
| assertThat(json).isEqualTo(expected); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(strings = { "AUTO_SPLIT", "STICKY" }) |
| void shouldSerDeserKeySharedPolicy(String keySharedPolicy) throws Exception { |
| String content = (String.format("\"%s\"", keySharedPolicy)); |
| KeySharedPolicy policy = MAPPER.readValue(content, KeySharedPolicy.class); |
| String json = MAPPER.writeValueAsString(policy); |
| assertThat(json).isEqualTo(content); |
| } |
| |
| @Test |
| void shouldSerializeCustomKeySharedPolicy() throws Exception { |
| String json = MAPPER.writeValueAsString(new TestKeySharedPolicy()); |
| String expected = "\"org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestKeySharedPolicy\""; |
| assertThat(json).isEqualTo(expected); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(strings = { "parallel", "elastic", "boundedElastic", "immediate", "single" }) |
| void shouldSerDeserScheduler(String scheduler) throws Exception { |
| String content = (String.format("\"%s\"", scheduler)); |
| Scheduler policy = MAPPER.readValue(content, Scheduler.class); |
| String json = MAPPER.writeValueAsString(policy); |
| assertThat(json).isEqualTo(content); |
| } |
| |
| @Test |
| void shouldSerializeCustomScheduler() throws Exception { |
| String json = MAPPER.writeValueAsString(new TestScheduler()); |
| String expected = "\"org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestScheduler\""; |
| assertThat(json).isEqualTo(expected); |
| } |
| |
| @Test |
| void shouldSerDeserDeadLetterPolicy() throws Exception { |
| // @formatter:off |
| String content = ("{\n" |
| + " 'maxRedeliverCount' : 0,\n" |
| + " 'retryLetterTopic' : 'my-retry-topic',\n" |
| + " 'deadLetterTopic' : 'my-dlq',\n" |
| + " 'initialSubscriptionName' : 'my-dlq-sub'\n" |
| + "}").replaceAll("'", "\""); |
| // @formatter:on |
| DeadLetterPolicy policy = MAPPER.readValue(content, DeadLetterPolicy.class); |
| String json = MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(policy); |
| assertThat(json).isEqualTo(content); |
| } |
| |
| @Test |
| void shouldSerDeserEmptyDeadLetterPolicy() throws Exception { |
| DeadLetterPolicy policy = MAPPER.readValue("{}", DeadLetterPolicy.class); |
| String json = MAPPER.writeValueAsString(policy); |
| assertThat(json).isEqualTo("{\"maxRedeliverCount\":0}"); |
| } |
| |
| @Test |
| void shouldSerDeserCryptoKeyReader() throws Exception { |
| // @formatter:off |
| String content = ("{" |
| + " 'className': 'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'," |
| + " 'args': {'dummy': 'my-dummy'}" |
| + "}").replaceAll("'", "\""); |
| // @formatter:on |
| CryptoKeyReader cryptoKeyReader = MAPPER.readValue(content, CryptoKeyReader.class); |
| String json = MAPPER.writeValueAsString(cryptoKeyReader); |
| String expected = ("{'className':'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'}") |
| .replaceAll("'", "\""); |
| assertThat(json).isEqualTo(expected); |
| } |
| |
| public static class TestScheduler implements Scheduler { |
| |
| @Override |
| public Disposable schedule(Runnable task) { |
| return null; |
| } |
| |
| @Override |
| public Worker createWorker() { |
| return null; |
| } |
| |
| } |
| |
| public static class TestKeySharedPolicy extends KeySharedPolicy { |
| |
| @Override |
| public void validate() { |
| } |
| |
| } |
| |
| static class TestCryptoKeyReader implements CryptoKeyReader { |
| |
| private final Map<String, Object> params; |
| |
| // CHECKSTYLE:OFF |
| public TestCryptoKeyReader(Map<String, Object> params) { |
| this.params = params; |
| } |
| // CHECKSTYLE:ON |
| |
| @Override |
| public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> metadata) { |
| return null; |
| } |
| |
| @Override |
| public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { |
| return null; |
| } |
| |
| } |
| |
| static class TestMessageRouter implements MessageRouter { |
| |
| private final Map<String, Object> params; |
| |
| // CHECKSTYLE:OFF |
| public TestMessageRouter(Map<String, Object> params) { |
| this.params = params; |
| } |
| // CHECKSTYLE:ON |
| |
| } |
| |
| } |