blob: 692e20c2046fe9626c83f8909ee5707a7d536e5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.conf;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.testng.Assert;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.annotations.Test;
/**
* Unit test {@link ConfigurationDataUtils}.
*/
public class ConfigurationDataUtilsTest {
@Test
public void testLoadClientConfigurationData() {
ClientConfigurationData confData = new ClientConfigurationData();
confData.setServiceUrl("pulsar://unknown:6650");
confData.setMaxLookupRequest(600);
confData.setMaxLookupRedirects(10);
confData.setNumIoThreads(33);
Map<String, Object> config = new HashMap<>();
Map<String, String> authParamMap = new HashMap<>();
authParamMap.put("k1", "v1");
authParamMap.put("k2", "v2");
config.put("serviceUrl", "pulsar://localhost:6650");
config.put("maxLookupRequest", 70000);
config.put("maxLookupRedirects", 50);
config.put("authParams", "testAuthParams");
config.put("authParamMap", authParamMap);
confData = ConfigurationDataUtils.loadData(config, confData, ClientConfigurationData.class);
assertEquals("pulsar://localhost:6650", confData.getServiceUrl());
assertEquals(70000, confData.getMaxLookupRequest());
assertEquals(50, confData.getMaxLookupRedirects());
assertEquals(33, confData.getNumIoThreads());
assertEquals("testAuthParams", confData.getAuthParams());
assertEquals("v1", confData.getAuthParamMap().get("k1"));
assertEquals("v2", confData.getAuthParamMap().get("k2"));
}
@Test
public void testLoadProducerConfigurationData() {
ProducerConfigurationData confData = new ProducerConfigurationData();
confData.setProducerName("unset");
confData.setBatchingEnabled(true);
confData.setBatchingMaxMessages(1234);
confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
Map<String, Object> config = new HashMap<>();
config.put("producerName", "test-producer");
config.put("batchingEnabled", false);
confData.setBatcherBuilder(BatcherBuilder.DEFAULT);
confData = ConfigurationDataUtils.loadData(config, confData, ProducerConfigurationData.class);
assertEquals("test-producer", confData.getProducerName());
assertFalse(confData.isBatchingEnabled());
assertEquals(1234, confData.getBatchingMaxMessages());
assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
}
@Test
public void testLoadConsumerConfigurationData() {
ConsumerConfigurationData confData = new ConsumerConfigurationData();
confData.setSubscriptionName("unknown-subscription");
confData.setPriorityLevel(10000);
confData.setConsumerName("unknown-consumer");
confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
Map<String, Object> config = new HashMap<>();
config.put("subscriptionName", "test-subscription");
config.put("priorityLevel", 100);
confData = ConfigurationDataUtils.loadData(config, confData, ConsumerConfigurationData.class);
assertEquals("test-subscription", confData.getSubscriptionName());
assertEquals(100, confData.getPriorityLevel());
assertEquals("unknown-consumer", confData.getConsumerName());
assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
}
@Test
public void testLoadReaderConfigurationData() {
ReaderConfigurationData confData = new ReaderConfigurationData();
confData.setTopicName("unknown");
confData.setReceiverQueueSize(1000000);
confData.setReaderName("unknown-reader");
Map<String, Object> config = new HashMap<>();
config.put("topicNames", ImmutableSet.of("test-topic"));
config.put("receiverQueueSize", 100);
confData = ConfigurationDataUtils.loadData(config, confData, ReaderConfigurationData.class);
assertEquals("test-topic", confData.getTopicName());
assertEquals(100, confData.getReceiverQueueSize());
assertEquals("unknown-reader", confData.getReaderName());
}
@Test
public void testLoadConfigurationDataWithUnknownFields() {
ReaderConfigurationData confData = new ReaderConfigurationData();
confData.setTopicName("unknown");
confData.setReceiverQueueSize(1000000);
confData.setReaderName("unknown-reader");
Map<String, Object> config = new HashMap<>();
config.put("unknown", "test-topic");
config.put("receiverQueueSize", 100);
try {
ConfigurationDataUtils.loadData(config, confData, ReaderConfigurationData.class);
fail("Should fail loading configuration data with unknown fields");
} catch (RuntimeException re) {
assertTrue(re.getCause() instanceof IOException);
}
}
@Test
public void testConfigBuilder() throws PulsarClientException {
ClientConfigurationData clientConfig = new ClientConfigurationData();
clientConfig.setServiceUrl("pulsar://unknown:6650");
clientConfig.setStatsIntervalSeconds(80);
PulsarClientImpl pulsarClient = new PulsarClientImpl(clientConfig);
assertNotNull(pulsarClient, "Pulsar client built using config should not be null");
assertEquals(pulsarClient.getConfiguration().getServiceUrl(), "pulsar://unknown:6650");
assertEquals(pulsarClient.getConfiguration().getNumListenerThreads(), 1, "builder default not set properly");
assertEquals(pulsarClient.getConfiguration().getStatsIntervalSeconds(), 80,
"builder default should override if set explicitly");
}
@Test
public void testLoadSecretParams() {
ClientConfigurationData confData = new ClientConfigurationData();
Map<String, String> authParamMap = new HashMap<>();
authParamMap.put("k1", "v1");
confData.setServiceUrl("pulsar://unknown:6650");
confData.setAuthParams("");
confData.setAuthParamMap(authParamMap);
authParamMap.put("k2", "v2");
Map<String, Object> config = new HashMap<>();
config.put("serviceUrl", "pulsar://localhost:6650");
config.put("authParams", "testAuthParams");
config.put("authParamMap", authParamMap);
confData = ConfigurationDataUtils.loadData(config, confData, ClientConfigurationData.class);
assertEquals("pulsar://localhost:6650", confData.getServiceUrl());
assertEquals("testAuthParams", confData.getAuthParams());
assertEquals("v1", confData.getAuthParamMap().get("k1"));
assertEquals("v2", confData.getAuthParamMap().get("k2"));
final String secretStr = "*****";
try {
String confDataJson = new ObjectMapper().writeValueAsString(confData);
Map<String, Object> confDataMap = new ObjectMapper().readValue(confDataJson, Map.class);
assertEquals("pulsar://localhost:6650", confDataMap.get("serviceUrl"));
assertEquals(secretStr, confDataMap.get("authParams"));
assertEquals(secretStr, confDataMap.get("authParamMap"));
} catch (Exception e) {
Assert.fail();
}
}
}