blob: b517f55534e6717a6e378bea416be80469ba6743 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.provider.EnvVarConfigProvider;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.config.provider.MockVaultConfigProvider;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metrics.FakeMetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class AbstractConfigTest {
private String propertyValue;
@BeforeEach
public void setup() {
propertyValue = System.getProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
}
@AfterEach
public void teardown() {
if (propertyValue != null) {
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, propertyValue);
} else {
System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
}
}
@Test
public void testConfiguredInstances() {
testValidInputs(" ");
testValidInputs("");
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter");
testValidInputs(" org.apache.kafka.common.metrics.FakeMetricsReporter ");
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter");
testInvalidInputs(",");
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
testInvalidInputs("test1,test2");
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
}
@Test
public void testEmptyList() {
AbstractConfig conf;
ConfigDef configDef = new ConfigDef().define("a", Type.LIST, "", new ConfigDef.NonNullValidator(), Importance.HIGH, "doc");
conf = new AbstractConfig(configDef, Collections.emptyMap());
assertEquals(Collections.emptyList(), conf.getList("a"));
conf = new AbstractConfig(configDef, Collections.singletonMap("a", ""));
assertEquals(Collections.emptyList(), conf.getList("a"));
conf = new AbstractConfig(configDef, Collections.singletonMap("a", "b,c,d"));
assertEquals(Arrays.asList("b", "c", "d"), conf.getList("a"));
}
@Test
public void testOriginalsWithPrefix() {
Properties props = new Properties();
props.put("foo.bar", "abc");
props.put("setting", "def");
TestConfig config = new TestConfig(props);
Map<String, Object> originalsWithPrefix = config.originalsWithPrefix("foo.");
assertTrue(config.unused().contains("foo.bar"));
originalsWithPrefix.get("bar");
assertFalse(config.unused().contains("foo.bar"));
Map<String, Object> expected = new HashMap<>();
expected.put("bar", "abc");
assertEquals(expected, originalsWithPrefix);
}
@Test
public void testPreprocessConfig() {
Properties props = new Properties();
TestConfig config = new TestConfig(props);
assertEquals("success", config.get("preprocess"));
}
@Test
public void testValuesWithPrefixOverride() {
String prefix = "prefix.";
Properties props = new Properties();
props.put("sasl.mechanism", "PLAIN");
props.put("prefix.sasl.mechanism", "GSSAPI");
props.put("prefix.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2");
props.put("prefix.ssl.truststore.location", "my location");
props.put("sasl.kerberos.service.name", "service name");
props.put("ssl.keymanager.algorithm", "algorithm");
TestSecurityConfig config = new TestSecurityConfig(props);
Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
// prefix overrides global
assertTrue(config.unused().contains("prefix.sasl.mechanism"));
assertTrue(config.unused().contains("sasl.mechanism"));
assertEquals("GSSAPI", valuesWithPrefixOverride.get("sasl.mechanism"));
assertFalse(config.unused().contains("sasl.mechanism"));
assertFalse(config.unused().contains("prefix.sasl.mechanism"));
// prefix overrides default
assertTrue(config.unused().contains("prefix.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("prefix.sasl.kerberos.kinit.cmd"));
// prefix override with no default
assertTrue(config.unused().contains("prefix.ssl.truststore.location"));
assertFalse(config.unused().contains("ssl.truststore.location"));
assertEquals("my location", valuesWithPrefixOverride.get("ssl.truststore.location"));
assertFalse(config.unused().contains("ssl.truststore.location"));
assertFalse(config.unused().contains("prefix.ssl.truststore.location"));
// global overrides default
assertTrue(config.unused().contains("ssl.keymanager.algorithm"));
assertEquals("algorithm", valuesWithPrefixOverride.get("ssl.keymanager.algorithm"));
assertFalse(config.unused().contains("ssl.keymanager.algorithm"));
// global with no default
assertTrue(config.unused().contains("sasl.kerberos.service.name"));
assertEquals("service name", valuesWithPrefixOverride.get("sasl.kerberos.service.name"));
assertFalse(config.unused().contains("sasl.kerberos.service.name"));
// unset with default
assertFalse(config.unused().contains("sasl.kerberos.min.time.before.relogin"));
assertEquals(SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
valuesWithPrefixOverride.get("sasl.kerberos.min.time.before.relogin"));
assertFalse(config.unused().contains("sasl.kerberos.min.time.before.relogin"));
// unset with no default
assertFalse(config.unused().contains("ssl.key.password"));
assertNull(valuesWithPrefixOverride.get("ssl.key.password"));
assertFalse(config.unused().contains("ssl.key.password"));
}
@Test
public void testValuesWithSecondaryPrefix() {
String prefix = "listener.name.listener1.";
Password saslJaasConfig1 = new Password("test.myLoginModule1 required;");
Password saslJaasConfig2 = new Password("test.myLoginModule2 required;");
Password saslJaasConfig3 = new Password("test.myLoginModule3 required;");
Properties props = new Properties();
props.put("listener.name.listener1.test-mechanism.sasl.jaas.config", saslJaasConfig1.value());
props.put("test-mechanism.sasl.jaas.config", saslJaasConfig2.value());
props.put("sasl.jaas.config", saslJaasConfig3.value());
props.put("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2");
props.put("listener.name.listener1.gssapi.sasl.kerberos.service.name", "testkafka");
props.put("listener.name.listener1.gssapi.sasl.kerberos.min.time.before.relogin", "60000");
props.put("ssl.provider", "TEST");
TestSecurityConfig config = new TestSecurityConfig(props);
Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
// prefix with mechanism overrides global
assertTrue(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
assertTrue(config.unused().contains("test-mechanism.sasl.jaas.config"));
assertEquals(saslJaasConfig1, valuesWithPrefixOverride.get("test-mechanism.sasl.jaas.config"));
assertEquals(saslJaasConfig3, valuesWithPrefixOverride.get("sasl.jaas.config"));
assertFalse(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
assertFalse(config.unused().contains("test-mechanism.sasl.jaas.config"));
assertFalse(config.unused().contains("sasl.jaas.config"));
// prefix with mechanism overrides default
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("gssapi.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("listener.name.listener1.sasl.kerberos.kinit.cmd"));
// prefix override for mechanism with no default
assertFalse(config.unused().contains("sasl.kerberos.service.name"));
assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
assertFalse(config.unused().contains("gssapi.sasl.kerberos.service.name"));
assertFalse(config.unused().contains("sasl.kerberos.service.name"));
assertEquals("testkafka", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.service.name"));
assertFalse(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
// unset with no default
assertTrue(config.unused().contains("ssl.provider"));
assertNull(valuesWithPrefixOverride.get("gssapi.ssl.provider"));
assertTrue(config.unused().contains("ssl.provider"));
}
@Test
public void testValuesWithPrefixAllOrNothing() {
String prefix1 = "prefix1.";
String prefix2 = "prefix2.";
Properties props = new Properties();
props.put("sasl.mechanism", "PLAIN");
props.put("prefix1.sasl.mechanism", "GSSAPI");
props.put("prefix1.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2");
props.put("prefix1.ssl.truststore.location", "my location");
props.put("sasl.kerberos.service.name", "service name");
props.put("ssl.keymanager.algorithm", "algorithm");
TestSecurityConfig config = new TestSecurityConfig(props);
Map<String, Object> valuesWithPrefixAllOrNothing1 = config.valuesWithPrefixAllOrNothing(prefix1);
// All prefixed values are there
assertEquals("GSSAPI", valuesWithPrefixAllOrNothing1.get("sasl.mechanism"));
assertEquals("/usr/bin/kinit2", valuesWithPrefixAllOrNothing1.get("sasl.kerberos.kinit.cmd"));
assertEquals("my location", valuesWithPrefixAllOrNothing1.get("ssl.truststore.location"));
// Non-prefixed values are missing
assertFalse(valuesWithPrefixAllOrNothing1.containsKey("sasl.kerberos.service.name"));
assertFalse(valuesWithPrefixAllOrNothing1.containsKey("ssl.keymanager.algorithm"));
Map<String, Object> valuesWithPrefixAllOrNothing2 = config.valuesWithPrefixAllOrNothing(prefix2);
assertTrue(valuesWithPrefixAllOrNothing2.containsKey("sasl.kerberos.service.name"));
assertTrue(valuesWithPrefixAllOrNothing2.containsKey("ssl.keymanager.algorithm"));
}
@Test
public void testUnusedConfigs() {
Properties props = new Properties();
String configValue = "org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter";
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
props.put(ConfiguredFakeMetricsReporter.EXTRA_CONFIG, "my_value");
TestConfig config = new TestConfig(props);
assertTrue(config.unused().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG),
ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked unused before getConfiguredInstances is called");
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
assertFalse(config.unused().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG),
ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked as used");
}
private void testValidInputs(String configValue) {
Properties props = new Properties();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
TestConfig config = new TestConfig(props);
try {
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
} catch (ConfigException e) {
fail("No exceptions are expected here, valid props are :" + props);
}
}
private void testInvalidInputs(String configValue) {
Properties props = new Properties();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
TestConfig config = new TestConfig(props);
assertThrows(KafkaException.class, () -> config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class));
}
@Test
public void testConfiguredInstancesClosedOnFailure() {
try {
Map<String, String> props = new HashMap<>();
String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", "
+ MockConsumerInterceptor.class.getName() + ", "
+ MockConsumerInterceptor.class.getName();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, threeConsumerInterceptors);
props.put("client.id", "test");
TestConfig testConfig = new TestConfig(props);
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3);
assertThrows(
Exception.class,
() -> testConfig.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, Object.class)
);
assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
} finally {
MockConsumerInterceptor.resetCounters();
}
}
@Test
public void testClassConfigs() {
class RestrictedClassLoader extends ClassLoader {
public RestrictedClassLoader() {
super(null);
}
@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
if (name.equals(ClassTestConfig.DEFAULT_CLASS.getName()) || name.equals(ClassTestConfig.RESTRICTED_CLASS.getName()))
throw new ClassNotFoundException();
else
return ClassTestConfig.class.getClassLoader().loadClass(name);
}
}
ClassLoader restrictedClassLoader = new RestrictedClassLoader();
ClassLoader defaultClassLoader = AbstractConfig.class.getClassLoader();
ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
try {
// Test default classloading where all classes are visible to thread context classloader
Thread.currentThread().setContextClassLoader(defaultClassLoader);
ClassTestConfig testConfig = new ClassTestConfig();
testConfig.checkInstances(ClassTestConfig.DEFAULT_CLASS, ClassTestConfig.DEFAULT_CLASS);
// Test default classloading where default classes are not visible to thread context classloader
// Static classloading is used for default classes, so instance creation should succeed.
Thread.currentThread().setContextClassLoader(restrictedClassLoader);
testConfig = new ClassTestConfig();
testConfig.checkInstances(ClassTestConfig.DEFAULT_CLASS, ClassTestConfig.DEFAULT_CLASS);
// Test class overrides with names or classes where all classes are visible to thread context classloader
Thread.currentThread().setContextClassLoader(defaultClassLoader);
ClassTestConfig.testOverrides();
// Test class overrides with names or classes where all classes are visible to Kafka classloader, context classloader is null
Thread.currentThread().setContextClassLoader(null);
ClassTestConfig.testOverrides();
// Test class overrides where some classes are not visible to thread context classloader
Thread.currentThread().setContextClassLoader(restrictedClassLoader);
// Properties specified as classes should succeed
testConfig = new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS, Collections.singletonList(ClassTestConfig.RESTRICTED_CLASS));
testConfig.checkInstances(ClassTestConfig.RESTRICTED_CLASS, ClassTestConfig.RESTRICTED_CLASS);
testConfig = new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS, Arrays.asList(ClassTestConfig.VISIBLE_CLASS, ClassTestConfig.RESTRICTED_CLASS));
testConfig.checkInstances(ClassTestConfig.RESTRICTED_CLASS, ClassTestConfig.VISIBLE_CLASS, ClassTestConfig.RESTRICTED_CLASS);
// Properties specified as classNames should fail to load classes
assertThrows(ConfigException.class, () -> new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS.getName(), null),
"Config created with class property that cannot be loaded");
ClassTestConfig config = new ClassTestConfig(null, Arrays.asList(ClassTestConfig.VISIBLE_CLASS.getName(), ClassTestConfig.RESTRICTED_CLASS.getName()));
assertThrows(KafkaException.class, () -> config.getConfiguredInstances("list.prop", MetricsReporter.class),
"Should have failed to load class");
ClassTestConfig config2 = new ClassTestConfig(null, ClassTestConfig.VISIBLE_CLASS.getName() + "," + ClassTestConfig.RESTRICTED_CLASS.getName());
assertThrows(KafkaException.class, () -> config2.getConfiguredInstances("list.prop", MetricsReporter.class),
"Should have failed to load class");
} finally {
Thread.currentThread().setContextClassLoader(originClassLoader);
}
}
@Test
public void testOriginalWithOverrides() {
Properties props = new Properties();
props.put("config.providers", "file");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
assertEquals("file", config.originals().get("config.providers"));
assertEquals("file2", config.originals(Collections.singletonMap("config.providers", "file2")).get("config.providers"));
}
@Test
public void testOriginalsWithConfigProvidersProps() {
Properties props = new Properties();
// Test Case: Valid Test Case for ConfigProviders as part of config.properties
props.put("config.providers", "file");
props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
String id = UUID.randomUUID().toString();
props.put("config.providers.file.param.testId", id);
props.put("prefix.ssl.truststore.location.number", 5);
props.put("sasl.kerberos.service.name", "service name");
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
assertEquals("randomPassword", config.originals().get("sasl.kerberos.password"));
assertEquals(5, config.originals().get("prefix.ssl.truststore.location.number"));
assertEquals("service name", config.originals().get("sasl.kerberos.service.name"));
MockFileConfigProvider.assertClosed(id);
}
@Test
public void testOriginalsWithConfigProvidersPropsExcluded() {
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockVaultConfigProvider.class.getName() + " , " + FileConfigProvider.class.getName());
Properties props = new Properties();
// Test Case: Config provider that is not an allowed class
props.put("config.providers", "file");
props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
String id = UUID.randomUUID().toString();
props.put("config.providers.file.param.testId", id);
props.put("prefix.ssl.truststore.location.number", 5);
props.put("sasl.kerberos.service.name", "service name");
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap()));
}
@Test
public void testOriginalsWithConfigProvidersPropsIncluded() {
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName() + " , " + FileConfigProvider.class.getName());
Properties props = new Properties();
// Test Case: Config provider that is an allowed class
props.put("config.providers", "file");
props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
String id = UUID.randomUUID().toString();
props.put("config.providers.file.param.testId", id);
props.put("prefix.ssl.truststore.location.number", 5);
props.put("sasl.kerberos.service.name", "service name");
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Collections.emptyMap());
assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
assertEquals("randomPassword", config.originals().get("sasl.kerberos.password"));
MockFileConfigProvider.assertClosed(id);
}
@Test
public void testConfigProvidersPropsAsParam() {
// Test Case: Valid Test Case for ConfigProviders as a separate variable
Properties providers = new Properties();
providers.put("config.providers", "file");
providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
String id = UUID.randomUUID().toString();
providers.put("config.providers.file.param.testId", id);
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
assertEquals("randomPassword", config.originals().get("sasl.kerberos.password"));
MockFileConfigProvider.assertClosed(id);
}
@Test
public void testAutomaticConfigProvidersWithFullClassName() {
// case0: MockFileConfigProvider is disallowed by org.apache.kafka.automatic.config.providers
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, "file");
assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(Map.of("config.providers", "file",
"config.providers.file.class", MockFileConfigProvider.class.getName()),
Map.of()));
// case1: MockFileConfigProvider is allowed by org.apache.kafka.automatic.config.providers
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName());
Map<String, String> props = Map.of("config.providers", "file",
"config.providers.file.class", MockFileConfigProvider.class.getName(),
"config.providers.file.param.testId", UUID.randomUUID().toString(),
"test.key", "${file:/path:key}");
assertEquals("testKey", new TestIndirectConfigResolution(props, Map.of()).originals().get("test.key"));
// case2: MockFileConfigProvider and EnvVarConfigProvider are allowed by org.apache.kafka.automatic.config.providers
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY,
MockFileConfigProvider.class.getName() + "," + EnvVarConfigProvider.class.getName());
assertEquals("testKey", new TestIndirectConfigResolution(props, Map.of()).originals().get("test.key"));
}
@Test
public void testImmutableOriginalsWithConfigProvidersProps() {
// Test Case: Valid Test Case for ConfigProviders as a separate variable
Properties providers = new Properties();
providers.put("config.providers", "file");
providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
String id = UUID.randomUUID().toString();
providers.put("config.providers.file.param.testId", id);
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
Map<?, ?> immutableMap = Collections.unmodifiableMap(props);
Map<String, ?> provMap = Utils.castToStringObjectMap(providers);
TestIndirectConfigResolution config = new TestIndirectConfigResolution(immutableMap, provMap);
assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
MockFileConfigProvider.assertClosed(id);
}
@Test
public void testAutoConfigResolutionWithMultipleConfigProviders() {
// Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable
Properties providers = new Properties();
providers.put("config.providers", "file,vault");
providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
String id = UUID.randomUUID().toString();
providers.put("config.providers.file.param.testId", id);
providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}");
props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
assertEquals("randomPassword", config.originals().get("sasl.kerberos.password"));
assertEquals("testTruststoreKey", config.originals().get("sasl.truststore.key"));
assertEquals("randomtruststorePassword", config.originals().get("sasl.truststore.password"));
MockFileConfigProvider.assertClosed(id);
}
@Test
public void testAutoConfigResolutionWithInvalidConfigProviderClass() {
// Test Case: Invalid class for Config Provider
Properties props = new Properties();
props.put("config.providers", "file");
props.put("config.providers.file.class",
"org.apache.kafka.common.config.provider.InvalidConfigProvider");
props.put("testKey", "${test:/foo/bar/testpath:testKey}");
assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props));
}
@Test
public void testAutoConfigResolutionWithInvalidConfigProviderClassExcluded() {
String invalidConfigProvider = "org.apache.kafka.common.config.provider.InvalidConfigProvider";
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, "");
// Test Case: Any config provider specified while the system property is empty
Properties props = new Properties();
props.put("config.providers", "file");
props.put("config.providers.file.class", invalidConfigProvider);
props.put("testKey", "${test:/foo/bar/testpath:testKey}");
KafkaException e = assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap()));
assertTrue(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY));
}
@Test
public void testAutoConfigResolutionWithInvalidConfigProviderClassIncluded() {
String invalidConfigProvider = "org.apache.kafka.common.config.provider.InvalidConfigProvider";
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, invalidConfigProvider);
// Test Case: Invalid config provider specified, but is also included in the system property
Properties props = new Properties();
props.put("config.providers", "file");
props.put("config.providers.file.class", invalidConfigProvider);
props.put("testKey", "${test:/foo/bar/testpath:testKey}");
KafkaException e = assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap()));
assertFalse(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY));
}
@Test
public void testAutoConfigResolutionWithMissingConfigProvider() {
// Test Case: Config Provider for a variable missing in config file.
Properties props = new Properties();
props.put("testKey", "${test:/foo/bar/testpath:testKey}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
assertEquals("${test:/foo/bar/testpath:testKey}", config.originals().get("testKey"));
}
@Test
public void testAutoConfigResolutionWithMissingConfigKey() {
// Test Case: Config Provider fails to resolve the config (key not present)
Properties props = new Properties();
props.put("config.providers", "test");
props.put("config.providers.test.class", MockFileConfigProvider.class.getName());
String id = UUID.randomUUID().toString();
props.put("config.providers.test.param.testId", id);
props.put("random", "${test:/foo/bar/testpath:random}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
assertEquals("${test:/foo/bar/testpath:random}", config.originals().get("random"));
MockFileConfigProvider.assertClosed(id);
}
@Test
public void testAutoConfigResolutionWithDuplicateConfigProvider() {
// Test Case: If ConfigProvider is provided in both originals and provider. Only the ones in provider should be used.
Properties providers = new Properties();
providers.put("config.providers", "test");
providers.put("config.providers.test.class", MockVaultConfigProvider.class.getName());
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("config.providers", "file");
props.put("config.providers.file.class", MockVaultConfigProvider.class.getName());
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
assertEquals("${file:/usr/kerberos:key}", config.originals().get("sasl.kerberos.key"));
}
@Test
public void testConfigProviderConfigurationWithConfigParams() {
// should have no effect
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName());
// Test Case: Specify a config provider not allowed, but passed via the trusted providers argument
Properties providers = new Properties();
providers.put("config.providers", "vault");
providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());
providers.put("config.providers.vault.param.key", "randomKey");
providers.put("config.providers.vault.param.location", "/usr/vault");
Properties props = new Properties();
props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}");
props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}");
props.put("sasl.truststore.location", "${vault:/usr/truststore:truststoreLocation}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
assertEquals("/usr/vault", config.originals().get("sasl.truststore.location"));
}
@Test
public void testDocumentationOf() {
Properties props = new Properties();
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
assertEquals(
TestIndirectConfigResolution.INDIRECT_CONFIGS_DOC,
config.documentationOf(TestIndirectConfigResolution.INDIRECT_CONFIGS)
);
}
@Test
public void testDocumentationOfExpectNull() {
Properties props = new Properties();
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
assertNull(config.documentationOf("xyz"));
}
private static class TestIndirectConfigResolution extends AbstractConfig {
private static final ConfigDef CONFIG;
public static final String INDIRECT_CONFIGS = "indirect.variables";
private static final String INDIRECT_CONFIGS_DOC = "Variables whose values can be obtained from ConfigProviders";
static {
CONFIG = new ConfigDef().define(INDIRECT_CONFIGS,
Type.LIST,
"",
Importance.LOW,
INDIRECT_CONFIGS_DOC);
}
public TestIndirectConfigResolution(Map<?, ?> props) {
super(CONFIG, props, true);
}
public TestIndirectConfigResolution(Map<?, ?> props, Map<String, ?> providers) {
super(CONFIG, props, providers, true);
}
}
private static class ClassTestConfig extends AbstractConfig {
static final Class<?> DEFAULT_CLASS = FakeMetricsReporter.class;
static final Class<?> VISIBLE_CLASS = JmxReporter.class;
static final Class<?> RESTRICTED_CLASS = ConfiguredFakeMetricsReporter.class;
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef().define("class.prop", Type.CLASS, DEFAULT_CLASS, Importance.HIGH, "docs")
.define("list.prop", Type.LIST, Collections.singletonList(DEFAULT_CLASS), Importance.HIGH, "docs");
}
public ClassTestConfig() {
super(CONFIG, new Properties());
}
public ClassTestConfig(Object classPropOverride, Object listPropOverride) {
super(CONFIG, overrideProps(classPropOverride, listPropOverride));
}
void checkInstances(Class<?> expectedClassPropClass, Class<?>... expectedListPropClasses) {
assertEquals(expectedClassPropClass, getConfiguredInstance("class.prop", MetricsReporter.class).getClass());
List<?> list = getConfiguredInstances("list.prop", MetricsReporter.class);
for (int i = 0; i < list.size(); i++)
assertEquals(expectedListPropClasses[i], list.get(i).getClass());
}
static void testOverrides() {
ClassTestConfig testConfig1 = new ClassTestConfig(RESTRICTED_CLASS, Arrays.asList(VISIBLE_CLASS, RESTRICTED_CLASS));
testConfig1.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, RESTRICTED_CLASS);
ClassTestConfig testConfig2 = new ClassTestConfig(RESTRICTED_CLASS.getName(), Arrays.asList(VISIBLE_CLASS.getName(), RESTRICTED_CLASS.getName()));
testConfig2.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, RESTRICTED_CLASS);
ClassTestConfig testConfig3 = new ClassTestConfig(RESTRICTED_CLASS.getName(), VISIBLE_CLASS.getName() + "," + RESTRICTED_CLASS.getName());
testConfig3.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, RESTRICTED_CLASS);
}
private static Map<String, Object> overrideProps(Object classProp, Object listProp) {
Map<String, Object> props = new HashMap<>();
if (classProp != null)
props.put("class.prop", classProp);
if (listProp != null)
props.put("list.prop", listProp);
return props;
}
}
private static class TestConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters.";
public static final String PREPROCESSOR_CONFIG = "preprocess";
private static final String PREPROCESSOR_CONFIG_DOC = "Override from preprocess step.";
static {
CONFIG = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
"",
Importance.LOW,
METRIC_REPORTER_CLASSES_DOC)
.define(PREPROCESSOR_CONFIG,
Type.STRING,
"",
Importance.LOW,
PREPROCESSOR_CONFIG_DOC);
}
public TestConfig(Map<?, ?> props) {
super(CONFIG, props);
}
@Override
protected Map<String, Object> preProcessParsedConfig(Map<String, Object> parsedValues) {
Map<String, Object> ret = new HashMap<>(parsedValues);
ret.put("preprocess", "success");
return ret;
}
}
public static class ConfiguredFakeMetricsReporter extends FakeMetricsReporter {
public static final String EXTRA_CONFIG = "metric.extra_config";
@Override
public void configure(Map<String, ?> configs) {
// Calling get() should have the side effect of marking that config as used.
// this is required by testUnusedConfigs
configs.get(EXTRA_CONFIG);
}
}
}