| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.common.config; |
| |
| import org.apache.kafka.common.KafkaException; |
| import org.apache.kafka.common.config.ConfigDef.Importance; |
| import org.apache.kafka.common.config.ConfigDef.Type; |
| import org.apache.kafka.common.config.provider.EnvVarConfigProvider; |
| import org.apache.kafka.common.config.provider.FileConfigProvider; |
| import org.apache.kafka.common.config.provider.MockFileConfigProvider; |
| import org.apache.kafka.common.config.provider.MockVaultConfigProvider; |
| import org.apache.kafka.common.config.types.Password; |
| import org.apache.kafka.common.metrics.FakeMetricsReporter; |
| import org.apache.kafka.common.metrics.JmxReporter; |
| import org.apache.kafka.common.metrics.MetricsReporter; |
| import org.apache.kafka.common.security.TestSecurityConfig; |
| import org.apache.kafka.common.utils.Utils; |
| import org.apache.kafka.test.MockConsumerInterceptor; |
| |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.UUID; |
| |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| |
| public class AbstractConfigTest { |
| |
| private String propertyValue; |
| |
| @BeforeEach |
| public void setup() { |
| propertyValue = System.getProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); |
| System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); |
| } |
| |
| @AfterEach |
| public void teardown() { |
| if (propertyValue != null) { |
| System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, propertyValue); |
| } else { |
| System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); |
| } |
| } |
| |
| @Test |
| public void testConfiguredInstances() { |
| testValidInputs(" "); |
| testValidInputs(""); |
| testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter"); |
| testValidInputs(" org.apache.kafka.common.metrics.FakeMetricsReporter "); |
| testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter"); |
| testInvalidInputs(","); |
| testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); |
| testInvalidInputs("test1,test2"); |
| testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); |
| } |
| |
| @Test |
| public void testEmptyList() { |
| AbstractConfig conf; |
| ConfigDef configDef = new ConfigDef().define("a", Type.LIST, "", new ConfigDef.NonNullValidator(), Importance.HIGH, "doc"); |
| |
| conf = new AbstractConfig(configDef, Collections.emptyMap()); |
| assertEquals(Collections.emptyList(), conf.getList("a")); |
| |
| conf = new AbstractConfig(configDef, Collections.singletonMap("a", "")); |
| assertEquals(Collections.emptyList(), conf.getList("a")); |
| |
| conf = new AbstractConfig(configDef, Collections.singletonMap("a", "b,c,d")); |
| assertEquals(Arrays.asList("b", "c", "d"), conf.getList("a")); |
| } |
| |
| @Test |
| public void testOriginalsWithPrefix() { |
| Properties props = new Properties(); |
| props.put("foo.bar", "abc"); |
| props.put("setting", "def"); |
| TestConfig config = new TestConfig(props); |
| Map<String, Object> originalsWithPrefix = config.originalsWithPrefix("foo."); |
| |
| assertTrue(config.unused().contains("foo.bar")); |
| originalsWithPrefix.get("bar"); |
| assertFalse(config.unused().contains("foo.bar")); |
| |
| Map<String, Object> expected = new HashMap<>(); |
| expected.put("bar", "abc"); |
| assertEquals(expected, originalsWithPrefix); |
| } |
| |
| @Test |
| public void testPreprocessConfig() { |
| Properties props = new Properties(); |
| TestConfig config = new TestConfig(props); |
| assertEquals("success", config.get("preprocess")); |
| } |
| |
| @Test |
| public void testValuesWithPrefixOverride() { |
| String prefix = "prefix."; |
| Properties props = new Properties(); |
| props.put("sasl.mechanism", "PLAIN"); |
| props.put("prefix.sasl.mechanism", "GSSAPI"); |
| props.put("prefix.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2"); |
| props.put("prefix.ssl.truststore.location", "my location"); |
| props.put("sasl.kerberos.service.name", "service name"); |
| props.put("ssl.keymanager.algorithm", "algorithm"); |
| TestSecurityConfig config = new TestSecurityConfig(props); |
| Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix); |
| |
| // prefix overrides global |
| assertTrue(config.unused().contains("prefix.sasl.mechanism")); |
| assertTrue(config.unused().contains("sasl.mechanism")); |
| assertEquals("GSSAPI", valuesWithPrefixOverride.get("sasl.mechanism")); |
| assertFalse(config.unused().contains("sasl.mechanism")); |
| assertFalse(config.unused().contains("prefix.sasl.mechanism")); |
| |
| // prefix overrides default |
| assertTrue(config.unused().contains("prefix.sasl.kerberos.kinit.cmd")); |
| assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd")); |
| assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("sasl.kerberos.kinit.cmd")); |
| assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd")); |
| assertFalse(config.unused().contains("prefix.sasl.kerberos.kinit.cmd")); |
| |
| // prefix override with no default |
| assertTrue(config.unused().contains("prefix.ssl.truststore.location")); |
| assertFalse(config.unused().contains("ssl.truststore.location")); |
| assertEquals("my location", valuesWithPrefixOverride.get("ssl.truststore.location")); |
| assertFalse(config.unused().contains("ssl.truststore.location")); |
| assertFalse(config.unused().contains("prefix.ssl.truststore.location")); |
| |
| // global overrides default |
| assertTrue(config.unused().contains("ssl.keymanager.algorithm")); |
| assertEquals("algorithm", valuesWithPrefixOverride.get("ssl.keymanager.algorithm")); |
| assertFalse(config.unused().contains("ssl.keymanager.algorithm")); |
| |
| // global with no default |
| assertTrue(config.unused().contains("sasl.kerberos.service.name")); |
| assertEquals("service name", valuesWithPrefixOverride.get("sasl.kerberos.service.name")); |
| assertFalse(config.unused().contains("sasl.kerberos.service.name")); |
| |
| // unset with default |
| assertFalse(config.unused().contains("sasl.kerberos.min.time.before.relogin")); |
| assertEquals(SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, |
| valuesWithPrefixOverride.get("sasl.kerberos.min.time.before.relogin")); |
| assertFalse(config.unused().contains("sasl.kerberos.min.time.before.relogin")); |
| |
| // unset with no default |
| assertFalse(config.unused().contains("ssl.key.password")); |
| assertNull(valuesWithPrefixOverride.get("ssl.key.password")); |
| assertFalse(config.unused().contains("ssl.key.password")); |
| } |
| |
| @Test |
| public void testValuesWithSecondaryPrefix() { |
| String prefix = "listener.name.listener1."; |
| Password saslJaasConfig1 = new Password("test.myLoginModule1 required;"); |
| Password saslJaasConfig2 = new Password("test.myLoginModule2 required;"); |
| Password saslJaasConfig3 = new Password("test.myLoginModule3 required;"); |
| Properties props = new Properties(); |
| props.put("listener.name.listener1.test-mechanism.sasl.jaas.config", saslJaasConfig1.value()); |
| props.put("test-mechanism.sasl.jaas.config", saslJaasConfig2.value()); |
| props.put("sasl.jaas.config", saslJaasConfig3.value()); |
| props.put("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2"); |
| props.put("listener.name.listener1.gssapi.sasl.kerberos.service.name", "testkafka"); |
| props.put("listener.name.listener1.gssapi.sasl.kerberos.min.time.before.relogin", "60000"); |
| props.put("ssl.provider", "TEST"); |
| TestSecurityConfig config = new TestSecurityConfig(props); |
| Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix); |
| |
| // prefix with mechanism overrides global |
| assertTrue(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config")); |
| assertTrue(config.unused().contains("test-mechanism.sasl.jaas.config")); |
| assertEquals(saslJaasConfig1, valuesWithPrefixOverride.get("test-mechanism.sasl.jaas.config")); |
| assertEquals(saslJaasConfig3, valuesWithPrefixOverride.get("sasl.jaas.config")); |
| assertFalse(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config")); |
| assertFalse(config.unused().contains("test-mechanism.sasl.jaas.config")); |
| assertFalse(config.unused().contains("sasl.jaas.config")); |
| |
| // prefix with mechanism overrides default |
| assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd")); |
| assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd")); |
| assertFalse(config.unused().contains("gssapi.sasl.kerberos.kinit.cmd")); |
| assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd")); |
| assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.kinit.cmd")); |
| assertFalse(config.unused().contains("listener.name.listener1.sasl.kerberos.kinit.cmd")); |
| |
| // prefix override for mechanism with no default |
| assertFalse(config.unused().contains("sasl.kerberos.service.name")); |
| assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name")); |
| assertFalse(config.unused().contains("gssapi.sasl.kerberos.service.name")); |
| assertFalse(config.unused().contains("sasl.kerberos.service.name")); |
| assertEquals("testkafka", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.service.name")); |
| assertFalse(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name")); |
| |
| // unset with no default |
| assertTrue(config.unused().contains("ssl.provider")); |
| assertNull(valuesWithPrefixOverride.get("gssapi.ssl.provider")); |
| assertTrue(config.unused().contains("ssl.provider")); |
| } |
| |
| @Test |
| public void testValuesWithPrefixAllOrNothing() { |
| String prefix1 = "prefix1."; |
| String prefix2 = "prefix2."; |
| Properties props = new Properties(); |
| props.put("sasl.mechanism", "PLAIN"); |
| props.put("prefix1.sasl.mechanism", "GSSAPI"); |
| props.put("prefix1.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2"); |
| props.put("prefix1.ssl.truststore.location", "my location"); |
| props.put("sasl.kerberos.service.name", "service name"); |
| props.put("ssl.keymanager.algorithm", "algorithm"); |
| TestSecurityConfig config = new TestSecurityConfig(props); |
| Map<String, Object> valuesWithPrefixAllOrNothing1 = config.valuesWithPrefixAllOrNothing(prefix1); |
| |
| // All prefixed values are there |
| assertEquals("GSSAPI", valuesWithPrefixAllOrNothing1.get("sasl.mechanism")); |
| assertEquals("/usr/bin/kinit2", valuesWithPrefixAllOrNothing1.get("sasl.kerberos.kinit.cmd")); |
| assertEquals("my location", valuesWithPrefixAllOrNothing1.get("ssl.truststore.location")); |
| |
| // Non-prefixed values are missing |
| assertFalse(valuesWithPrefixAllOrNothing1.containsKey("sasl.kerberos.service.name")); |
| assertFalse(valuesWithPrefixAllOrNothing1.containsKey("ssl.keymanager.algorithm")); |
| |
| Map<String, Object> valuesWithPrefixAllOrNothing2 = config.valuesWithPrefixAllOrNothing(prefix2); |
| assertTrue(valuesWithPrefixAllOrNothing2.containsKey("sasl.kerberos.service.name")); |
| assertTrue(valuesWithPrefixAllOrNothing2.containsKey("ssl.keymanager.algorithm")); |
| } |
| |
| @Test |
| public void testUnusedConfigs() { |
| Properties props = new Properties(); |
| String configValue = "org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter"; |
| props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue); |
| props.put(ConfiguredFakeMetricsReporter.EXTRA_CONFIG, "my_value"); |
| TestConfig config = new TestConfig(props); |
| |
| assertTrue(config.unused().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG), |
| ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked unused before getConfiguredInstances is called"); |
| |
| config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); |
| assertFalse(config.unused().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG), |
| ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked as used"); |
| } |
| |
| private void testValidInputs(String configValue) { |
| Properties props = new Properties(); |
| props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue); |
| TestConfig config = new TestConfig(props); |
| try { |
| config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); |
| } catch (ConfigException e) { |
| fail("No exceptions are expected here, valid props are :" + props); |
| } |
| } |
| |
| private void testInvalidInputs(String configValue) { |
| Properties props = new Properties(); |
| props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue); |
| TestConfig config = new TestConfig(props); |
| assertThrows(KafkaException.class, () -> config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class)); |
| } |
| |
| @Test |
| public void testConfiguredInstancesClosedOnFailure() { |
| |
| try { |
| Map<String, String> props = new HashMap<>(); |
| String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", " |
| + MockConsumerInterceptor.class.getName() + ", " |
| + MockConsumerInterceptor.class.getName(); |
| props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, threeConsumerInterceptors); |
| props.put("client.id", "test"); |
| TestConfig testConfig = new TestConfig(props); |
| |
| MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3); |
| assertThrows( |
| Exception.class, |
| () -> testConfig.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, Object.class) |
| ); |
| assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get()); |
| assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get()); |
| } finally { |
| MockConsumerInterceptor.resetCounters(); |
| } |
| } |
| |
| @Test |
| public void testClassConfigs() { |
| class RestrictedClassLoader extends ClassLoader { |
| public RestrictedClassLoader() { |
| super(null); |
| } |
| @Override |
| protected Class<?> findClass(String name) throws ClassNotFoundException { |
| if (name.equals(ClassTestConfig.DEFAULT_CLASS.getName()) || name.equals(ClassTestConfig.RESTRICTED_CLASS.getName())) |
| throw new ClassNotFoundException(); |
| else |
| return ClassTestConfig.class.getClassLoader().loadClass(name); |
| } |
| } |
| |
| ClassLoader restrictedClassLoader = new RestrictedClassLoader(); |
| ClassLoader defaultClassLoader = AbstractConfig.class.getClassLoader(); |
| |
| ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader(); |
| try { |
| // Test default classloading where all classes are visible to thread context classloader |
| Thread.currentThread().setContextClassLoader(defaultClassLoader); |
| ClassTestConfig testConfig = new ClassTestConfig(); |
| testConfig.checkInstances(ClassTestConfig.DEFAULT_CLASS, ClassTestConfig.DEFAULT_CLASS); |
| |
| // Test default classloading where default classes are not visible to thread context classloader |
| // Static classloading is used for default classes, so instance creation should succeed. |
| Thread.currentThread().setContextClassLoader(restrictedClassLoader); |
| testConfig = new ClassTestConfig(); |
| testConfig.checkInstances(ClassTestConfig.DEFAULT_CLASS, ClassTestConfig.DEFAULT_CLASS); |
| |
| // Test class overrides with names or classes where all classes are visible to thread context classloader |
| Thread.currentThread().setContextClassLoader(defaultClassLoader); |
| ClassTestConfig.testOverrides(); |
| |
| // Test class overrides with names or classes where all classes are visible to Kafka classloader, context classloader is null |
| Thread.currentThread().setContextClassLoader(null); |
| ClassTestConfig.testOverrides(); |
| |
| // Test class overrides where some classes are not visible to thread context classloader |
| Thread.currentThread().setContextClassLoader(restrictedClassLoader); |
| // Properties specified as classes should succeed |
| testConfig = new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS, Collections.singletonList(ClassTestConfig.RESTRICTED_CLASS)); |
| testConfig.checkInstances(ClassTestConfig.RESTRICTED_CLASS, ClassTestConfig.RESTRICTED_CLASS); |
| testConfig = new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS, Arrays.asList(ClassTestConfig.VISIBLE_CLASS, ClassTestConfig.RESTRICTED_CLASS)); |
| testConfig.checkInstances(ClassTestConfig.RESTRICTED_CLASS, ClassTestConfig.VISIBLE_CLASS, ClassTestConfig.RESTRICTED_CLASS); |
| |
| // Properties specified as classNames should fail to load classes |
| assertThrows(ConfigException.class, () -> new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS.getName(), null), |
| "Config created with class property that cannot be loaded"); |
| |
| ClassTestConfig config = new ClassTestConfig(null, Arrays.asList(ClassTestConfig.VISIBLE_CLASS.getName(), ClassTestConfig.RESTRICTED_CLASS.getName())); |
| assertThrows(KafkaException.class, () -> config.getConfiguredInstances("list.prop", MetricsReporter.class), |
| "Should have failed to load class"); |
| |
| ClassTestConfig config2 = new ClassTestConfig(null, ClassTestConfig.VISIBLE_CLASS.getName() + "," + ClassTestConfig.RESTRICTED_CLASS.getName()); |
| assertThrows(KafkaException.class, () -> config2.getConfiguredInstances("list.prop", MetricsReporter.class), |
| "Should have failed to load class"); |
| } finally { |
| Thread.currentThread().setContextClassLoader(originClassLoader); |
| } |
| } |
| |
| @Test |
| public void testOriginalWithOverrides() { |
| Properties props = new Properties(); |
| props.put("config.providers", "file"); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props); |
| assertEquals("file", config.originals().get("config.providers")); |
| assertEquals("file2", config.originals(Collections.singletonMap("config.providers", "file2")).get("config.providers")); |
| } |
| |
| @Test |
| public void testOriginalsWithConfigProvidersProps() { |
| Properties props = new Properties(); |
| |
| // Test Case: Valid Test Case for ConfigProviders as part of config.properties |
| props.put("config.providers", "file"); |
| props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); |
| String id = UUID.randomUUID().toString(); |
| props.put("config.providers.file.param.testId", id); |
| props.put("prefix.ssl.truststore.location.number", 5); |
| props.put("sasl.kerberos.service.name", "service name"); |
| props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); |
| props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}"); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props); |
| assertEquals("testKey", config.originals().get("sasl.kerberos.key")); |
| assertEquals("randomPassword", config.originals().get("sasl.kerberos.password")); |
| assertEquals(5, config.originals().get("prefix.ssl.truststore.location.number")); |
| assertEquals("service name", config.originals().get("sasl.kerberos.service.name")); |
| MockFileConfigProvider.assertClosed(id); |
| } |
| |
| @Test |
| public void testOriginalsWithConfigProvidersPropsExcluded() { |
| System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockVaultConfigProvider.class.getName() + " , " + FileConfigProvider.class.getName()); |
| Properties props = new Properties(); |
| |
| // Test Case: Config provider that is not an allowed class |
| props.put("config.providers", "file"); |
| props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); |
| String id = UUID.randomUUID().toString(); |
| props.put("config.providers.file.param.testId", id); |
| props.put("prefix.ssl.truststore.location.number", 5); |
| props.put("sasl.kerberos.service.name", "service name"); |
| props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); |
| props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}"); |
| assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap())); |
| } |
| |
| @Test |
| public void testOriginalsWithConfigProvidersPropsIncluded() { |
| System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName() + " , " + FileConfigProvider.class.getName()); |
| Properties props = new Properties(); |
| |
| // Test Case: Config provider that is an allowed class |
| props.put("config.providers", "file"); |
| props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); |
| String id = UUID.randomUUID().toString(); |
| props.put("config.providers.file.param.testId", id); |
| props.put("prefix.ssl.truststore.location.number", 5); |
| props.put("sasl.kerberos.service.name", "service name"); |
| props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); |
| props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}"); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Collections.emptyMap()); |
| assertEquals("testKey", config.originals().get("sasl.kerberos.key")); |
| assertEquals("randomPassword", config.originals().get("sasl.kerberos.password")); |
| MockFileConfigProvider.assertClosed(id); |
| } |
| |
| @Test |
| public void testConfigProvidersPropsAsParam() { |
| // Test Case: Valid Test Case for ConfigProviders as a separate variable |
| Properties providers = new Properties(); |
| providers.put("config.providers", "file"); |
| providers.put("config.providers.file.class", MockFileConfigProvider.class.getName()); |
| String id = UUID.randomUUID().toString(); |
| providers.put("config.providers.file.param.testId", id); |
| Properties props = new Properties(); |
| props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); |
| props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}"); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers)); |
| assertEquals("testKey", config.originals().get("sasl.kerberos.key")); |
| assertEquals("randomPassword", config.originals().get("sasl.kerberos.password")); |
| MockFileConfigProvider.assertClosed(id); |
| } |
| |
| @Test |
| public void testAutomaticConfigProvidersWithFullClassName() { |
| // case0: MockFileConfigProvider is disallowed by org.apache.kafka.automatic.config.providers |
| System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, "file"); |
| assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(Map.of("config.providers", "file", |
| "config.providers.file.class", MockFileConfigProvider.class.getName()), |
| Map.of())); |
| |
| // case1: MockFileConfigProvider is allowed by org.apache.kafka.automatic.config.providers |
| System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName()); |
| Map<String, String> props = Map.of("config.providers", "file", |
| "config.providers.file.class", MockFileConfigProvider.class.getName(), |
| "config.providers.file.param.testId", UUID.randomUUID().toString(), |
| "test.key", "${file:/path:key}"); |
| assertEquals("testKey", new TestIndirectConfigResolution(props, Map.of()).originals().get("test.key")); |
| |
| // case2: MockFileConfigProvider and EnvVarConfigProvider are allowed by org.apache.kafka.automatic.config.providers |
| System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, |
| MockFileConfigProvider.class.getName() + "," + EnvVarConfigProvider.class.getName()); |
| assertEquals("testKey", new TestIndirectConfigResolution(props, Map.of()).originals().get("test.key")); |
| } |
| |
| @Test |
| public void testImmutableOriginalsWithConfigProvidersProps() { |
| // Test Case: Valid Test Case for ConfigProviders as a separate variable |
| Properties providers = new Properties(); |
| providers.put("config.providers", "file"); |
| providers.put("config.providers.file.class", MockFileConfigProvider.class.getName()); |
| String id = UUID.randomUUID().toString(); |
| providers.put("config.providers.file.param.testId", id); |
| Properties props = new Properties(); |
| props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); |
| Map<?, ?> immutableMap = Collections.unmodifiableMap(props); |
| Map<String, ?> provMap = Utils.castToStringObjectMap(providers); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(immutableMap, provMap); |
| assertEquals("testKey", config.originals().get("sasl.kerberos.key")); |
| MockFileConfigProvider.assertClosed(id); |
| } |
| |
| @Test |
| public void testAutoConfigResolutionWithMultipleConfigProviders() { |
| // Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable |
| Properties providers = new Properties(); |
| providers.put("config.providers", "file,vault"); |
| providers.put("config.providers.file.class", MockFileConfigProvider.class.getName()); |
| String id = UUID.randomUUID().toString(); |
| providers.put("config.providers.file.param.testId", id); |
| providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName()); |
| Properties props = new Properties(); |
| props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); |
| props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}"); |
| props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}"); |
| props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}"); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers)); |
| assertEquals("testKey", config.originals().get("sasl.kerberos.key")); |
| assertEquals("randomPassword", config.originals().get("sasl.kerberos.password")); |
| assertEquals("testTruststoreKey", config.originals().get("sasl.truststore.key")); |
| assertEquals("randomtruststorePassword", config.originals().get("sasl.truststore.password")); |
| MockFileConfigProvider.assertClosed(id); |
| } |
| |
| @Test |
| public void testAutoConfigResolutionWithInvalidConfigProviderClass() { |
| // Test Case: Invalid class for Config Provider |
| Properties props = new Properties(); |
| props.put("config.providers", "file"); |
| props.put("config.providers.file.class", |
| "org.apache.kafka.common.config.provider.InvalidConfigProvider"); |
| props.put("testKey", "${test:/foo/bar/testpath:testKey}"); |
| assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props)); |
| } |
| |
| @Test |
| public void testAutoConfigResolutionWithInvalidConfigProviderClassExcluded() { |
| String invalidConfigProvider = "org.apache.kafka.common.config.provider.InvalidConfigProvider"; |
| System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, ""); |
| // Test Case: Any config provider specified while the system property is empty |
| Properties props = new Properties(); |
| props.put("config.providers", "file"); |
| props.put("config.providers.file.class", invalidConfigProvider); |
| props.put("testKey", "${test:/foo/bar/testpath:testKey}"); |
| KafkaException e = assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap())); |
| assertTrue(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY)); |
| } |
| |
| @Test |
| public void testAutoConfigResolutionWithInvalidConfigProviderClassIncluded() { |
| String invalidConfigProvider = "org.apache.kafka.common.config.provider.InvalidConfigProvider"; |
| System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, invalidConfigProvider); |
| // Test Case: Invalid config provider specified, but is also included in the system property |
| Properties props = new Properties(); |
| props.put("config.providers", "file"); |
| props.put("config.providers.file.class", invalidConfigProvider); |
| props.put("testKey", "${test:/foo/bar/testpath:testKey}"); |
| KafkaException e = assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap())); |
| assertFalse(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY)); |
| } |
| |
| @Test |
| public void testAutoConfigResolutionWithMissingConfigProvider() { |
| // Test Case: Config Provider for a variable missing in config file. |
| Properties props = new Properties(); |
| props.put("testKey", "${test:/foo/bar/testpath:testKey}"); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props); |
| assertEquals("${test:/foo/bar/testpath:testKey}", config.originals().get("testKey")); |
| } |
| |
| @Test |
| public void testAutoConfigResolutionWithMissingConfigKey() { |
| // Test Case: Config Provider fails to resolve the config (key not present) |
| Properties props = new Properties(); |
| props.put("config.providers", "test"); |
| props.put("config.providers.test.class", MockFileConfigProvider.class.getName()); |
| String id = UUID.randomUUID().toString(); |
| props.put("config.providers.test.param.testId", id); |
| props.put("random", "${test:/foo/bar/testpath:random}"); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props); |
| assertEquals("${test:/foo/bar/testpath:random}", config.originals().get("random")); |
| MockFileConfigProvider.assertClosed(id); |
| } |
| |
| @Test |
| public void testAutoConfigResolutionWithDuplicateConfigProvider() { |
| // Test Case: If ConfigProvider is provided in both originals and provider. Only the ones in provider should be used. |
| Properties providers = new Properties(); |
| providers.put("config.providers", "test"); |
| providers.put("config.providers.test.class", MockVaultConfigProvider.class.getName()); |
| |
| Properties props = new Properties(); |
| props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); |
| props.put("config.providers", "file"); |
| props.put("config.providers.file.class", MockVaultConfigProvider.class.getName()); |
| |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers)); |
| assertEquals("${file:/usr/kerberos:key}", config.originals().get("sasl.kerberos.key")); |
| } |
| |
| @Test |
| public void testConfigProviderConfigurationWithConfigParams() { |
| // should have no effect |
| System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName()); |
| // Test Case: Specify a config provider not allowed, but passed via the trusted providers argument |
| Properties providers = new Properties(); |
| providers.put("config.providers", "vault"); |
| providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName()); |
| providers.put("config.providers.vault.param.key", "randomKey"); |
| providers.put("config.providers.vault.param.location", "/usr/vault"); |
| Properties props = new Properties(); |
| props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}"); |
| props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}"); |
| props.put("sasl.truststore.location", "${vault:/usr/truststore:truststoreLocation}"); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers)); |
| assertEquals("/usr/vault", config.originals().get("sasl.truststore.location")); |
| } |
| |
| @Test |
| public void testDocumentationOf() { |
| Properties props = new Properties(); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props); |
| |
| assertEquals( |
| TestIndirectConfigResolution.INDIRECT_CONFIGS_DOC, |
| config.documentationOf(TestIndirectConfigResolution.INDIRECT_CONFIGS) |
| ); |
| } |
| |
| @Test |
| public void testDocumentationOfExpectNull() { |
| Properties props = new Properties(); |
| TestIndirectConfigResolution config = new TestIndirectConfigResolution(props); |
| |
| assertNull(config.documentationOf("xyz")); |
| } |
| |
| private static class TestIndirectConfigResolution extends AbstractConfig { |
| |
| private static final ConfigDef CONFIG; |
| |
| public static final String INDIRECT_CONFIGS = "indirect.variables"; |
| private static final String INDIRECT_CONFIGS_DOC = "Variables whose values can be obtained from ConfigProviders"; |
| |
| static { |
| CONFIG = new ConfigDef().define(INDIRECT_CONFIGS, |
| Type.LIST, |
| "", |
| Importance.LOW, |
| INDIRECT_CONFIGS_DOC); |
| } |
| |
| public TestIndirectConfigResolution(Map<?, ?> props) { |
| super(CONFIG, props, true); |
| } |
| |
| public TestIndirectConfigResolution(Map<?, ?> props, Map<String, ?> providers) { |
| super(CONFIG, props, providers, true); |
| } |
| } |
| |
| private static class ClassTestConfig extends AbstractConfig { |
| static final Class<?> DEFAULT_CLASS = FakeMetricsReporter.class; |
| static final Class<?> VISIBLE_CLASS = JmxReporter.class; |
| static final Class<?> RESTRICTED_CLASS = ConfiguredFakeMetricsReporter.class; |
| |
| private static final ConfigDef CONFIG; |
| static { |
| CONFIG = new ConfigDef().define("class.prop", Type.CLASS, DEFAULT_CLASS, Importance.HIGH, "docs") |
| .define("list.prop", Type.LIST, Collections.singletonList(DEFAULT_CLASS), Importance.HIGH, "docs"); |
| } |
| |
| public ClassTestConfig() { |
| super(CONFIG, new Properties()); |
| } |
| |
| public ClassTestConfig(Object classPropOverride, Object listPropOverride) { |
| super(CONFIG, overrideProps(classPropOverride, listPropOverride)); |
| } |
| |
| void checkInstances(Class<?> expectedClassPropClass, Class<?>... expectedListPropClasses) { |
| assertEquals(expectedClassPropClass, getConfiguredInstance("class.prop", MetricsReporter.class).getClass()); |
| List<?> list = getConfiguredInstances("list.prop", MetricsReporter.class); |
| for (int i = 0; i < list.size(); i++) |
| assertEquals(expectedListPropClasses[i], list.get(i).getClass()); |
| } |
| |
| static void testOverrides() { |
| ClassTestConfig testConfig1 = new ClassTestConfig(RESTRICTED_CLASS, Arrays.asList(VISIBLE_CLASS, RESTRICTED_CLASS)); |
| testConfig1.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, RESTRICTED_CLASS); |
| |
| ClassTestConfig testConfig2 = new ClassTestConfig(RESTRICTED_CLASS.getName(), Arrays.asList(VISIBLE_CLASS.getName(), RESTRICTED_CLASS.getName())); |
| testConfig2.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, RESTRICTED_CLASS); |
| |
| ClassTestConfig testConfig3 = new ClassTestConfig(RESTRICTED_CLASS.getName(), VISIBLE_CLASS.getName() + "," + RESTRICTED_CLASS.getName()); |
| testConfig3.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, RESTRICTED_CLASS); |
| } |
| |
| private static Map<String, Object> overrideProps(Object classProp, Object listProp) { |
| Map<String, Object> props = new HashMap<>(); |
| if (classProp != null) |
| props.put("class.prop", classProp); |
| if (listProp != null) |
| props.put("list.prop", listProp); |
| return props; |
| } |
| } |
| |
| private static class TestConfig extends AbstractConfig { |
| |
| private static final ConfigDef CONFIG; |
| |
| public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; |
| private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters."; |
| public static final String PREPROCESSOR_CONFIG = "preprocess"; |
| private static final String PREPROCESSOR_CONFIG_DOC = "Override from preprocess step."; |
| |
| static { |
| CONFIG = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG, |
| Type.LIST, |
| "", |
| Importance.LOW, |
| METRIC_REPORTER_CLASSES_DOC) |
| .define(PREPROCESSOR_CONFIG, |
| Type.STRING, |
| "", |
| Importance.LOW, |
| PREPROCESSOR_CONFIG_DOC); |
| } |
| |
| public TestConfig(Map<?, ?> props) { |
| super(CONFIG, props); |
| } |
| |
| @Override |
| protected Map<String, Object> preProcessParsedConfig(Map<String, Object> parsedValues) { |
| Map<String, Object> ret = new HashMap<>(parsedValues); |
| ret.put("preprocess", "success"); |
| return ret; |
| } |
| } |
| |
| public static class ConfiguredFakeMetricsReporter extends FakeMetricsReporter { |
| public static final String EXTRA_CONFIG = "metric.extra_config"; |
| @Override |
| public void configure(Map<String, ?> configs) { |
| // Calling get() should have the side effect of marking that config as used. |
| // this is required by testUnusedConfigs |
| configs.get(EXTRA_CONFIG); |
| } |
| } |
| } |