Add DynamicConfigProvider for Schema Registry (#11362)
* add_DynamicConfigProvider_for_schema_registry
* bug fixed
* add document
* fix document
* fix spot bug
* fix document
* inject ObjectMapper
* add DynamicConfigProviderUtils
* add UT
* bug fixed
Co-authored-by: yuanyi <yuanyi@freewheel.tv>
diff --git a/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
new file mode 100644
index 0000000..4c45262
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.metadata.DynamicConfigProvider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DynamicConfigProviderUtils
+{
+ public static Map<String, String> extraConfigAndSetStringMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
+ {
+ HashMap<String, String> newConfig = new HashMap<>();
+ if (config != null) {
+ for (Map.Entry<String, Object> entry : config.entrySet()) {
+ if (!dynamicConfigProviderKey.equals(entry.getKey())) {
+ newConfig.put(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
+ for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
+ newConfig.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return newConfig;
+ }
+
+ public static Map<String, Object> extraConfigAndSetObjectMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
+ {
+ HashMap<String, Object> newConfig = new HashMap<>();
+ if (config != null) {
+ for (Map.Entry<String, Object> entry : config.entrySet()) {
+ if (!dynamicConfigProviderKey.equals(entry.getKey())) {
+ newConfig.put(entry.getKey(), entry.getValue());
+ }
+ }
+ Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
+ for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
+ newConfig.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return newConfig;
+ }
+
+ private static Map<String, String> extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper)
+ {
+ if (dynamicConfigProviderJson != null) {
+ DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
+ return dynamicConfigProvider.getConfig();
+ }
+ return Collections.emptyMap();
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java
new file mode 100644
index 0000000..496acfa
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.metadata.DynamicConfigProvider;
+import org.apache.druid.metadata.MapStringDynamicConfigProvider;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import java.util.Map;
+
+@RunWith(Enclosed.class)
+public class DynamicConfigProviderUtilsTest
+{
+ public static class ThrowIfURLHasNotAllowedPropertiesTest
+ {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final String DYNAMIC_CONFIG_PROVIDER = "druid.dynamic.config.provider";
+
+ @Test
+ public void testExtraConfigAndSetStringMap()
+ {
+ DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
+ ImmutableMap.of("prop2", "value2")
+ );
+
+ Map<String, Object> properties = ImmutableMap.of(
+ "prop1", "value1",
+ "prop2", "value3",
+ DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
+ );
+ Map<String, String> res = DynamicConfigProviderUtils.extraConfigAndSetStringMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("value1", res.get("prop1"));
+ Assert.assertEquals("value2", res.get("prop2"));
+ }
+
+ @Test
+ public void testExtraConfigAndSetObjectMap()
+ {
+ DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
+ ImmutableMap.of("prop2", "value2")
+ );
+
+ Map<String, Object> properties = ImmutableMap.of(
+ "prop1", "value1",
+ "prop2", "value3",
+ DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
+ );
+ Map<String, Object> res = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("value1", res.get("prop1").toString());
+ Assert.assertEquals("value2", res.get("prop2").toString());
+ }
+ }
+}
diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index 002d6d2..cb1b3d2 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -380,8 +380,8 @@
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
-| config | Json | To send additional configurations, configured for Schema Registry | no |
-| headers | Json | To send headers to the Schema Registry | no |
+| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
+| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
For a single schema registry instance, use Field `url` or `urls` for multi instances.
@@ -408,12 +408,20 @@
"schema.registry.ssl.truststore.password": "<password>",
"schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
"schema.registry.ssl.keystore.password": "<password>",
- "schema.registry.ssl.key.password": "<password>"
+ "schema.registry.ssl.key.password": "<password>",
+ "schema.registry.ssl.key.password",
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
+ "druid.dynamic.config.provider":{
+ "type":"mapString",
+ "config":{
+ "registry.header.prop.1":"value.1",
+ "registry.header.prop.2":"value.2"
+ }
+ }
...
}
}
@@ -1223,8 +1231,8 @@
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
-| config | Json | To send additional configurations, configured for Schema Registry | no |
-| headers | Json | To send headers to the Schema Registry | no |
+| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md). | no |
+| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
For a single schema registry instance, use Field `url` or `urls` for multi instances.
@@ -1259,6 +1267,13 @@
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
+ "druid.dynamic.config.provider":{
+ "type":"mapString",
+ "config":{
+ "registry.header.prop.1":"value.1",
+ "registry.header.prop.2":"value.2"
+ }
+ }
...
}
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 4b3da38..59cb33e 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -19,8 +19,10 @@
package org.apache.druid.data.input.avro;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@@ -32,8 +34,10 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
+import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -48,16 +52,19 @@
private final String url;
private final int capacity;
private final List<String> urls;
- private final Map<String, ?> config;
- private final Map<String, String> headers;
+ private final Map<String, Object> config;
+ private final Map<String, Object> headers;
+ private final ObjectMapper jsonMapper;
+ public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
@JsonCreator
public SchemaRegistryBasedAvroBytesDecoder(
@JsonProperty("url") @Deprecated String url,
@JsonProperty("capacity") Integer capacity,
@JsonProperty("urls") @Nullable List<String> urls,
- @JsonProperty("config") @Nullable Map<String, ?> config,
- @JsonProperty("headers") @Nullable Map<String, String> headers
+ @JsonProperty("config") @Nullable Map<String, Object> config,
+ @JsonProperty("headers") @Nullable Map<String, Object> headers,
+ @JacksonInject @Json ObjectMapper jsonMapper
)
{
this.url = url;
@@ -65,10 +72,11 @@
this.urls = urls;
this.config = config;
this.headers = headers;
+ this.jsonMapper = jsonMapper;
if (url != null && !url.isEmpty()) {
- this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, this.config, this.headers);
+ this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
} else {
- this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, this.config, this.headers);
+ this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
}
}
@@ -91,13 +99,13 @@
}
@JsonProperty
- public Map<String, ?> getConfig()
+ public Map<String, Object> getConfig()
{
return config;
}
@JsonProperty
- public Map<String, String> getHeaders()
+ public Map<String, Object> getHeaders()
{
return headers;
}
@@ -112,6 +120,7 @@
this.config = null;
this.headers = null;
this.registry = registry;
+ this.jsonMapper = new ObjectMapper();
}
@Override
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
index 4213008..4a0c2df 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.data.input;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
@@ -108,6 +109,9 @@
for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
+ jsonMapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
}
@Test
@@ -151,7 +155,7 @@
{
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
flattenSpec,
- new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null),
+ new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null, null),
false,
false
);
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
index 3eb6439..ec073c9 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.data.input.avro;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@@ -30,8 +32,10 @@
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -41,6 +45,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Map;
public class SchemaRegistryBasedAvroBytesDecoderTest
{
@@ -56,7 +61,10 @@
public void testMultipleUrls() throws Exception
{
String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
@@ -70,7 +78,10 @@
public void testUrl() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
@@ -84,7 +95,10 @@
public void testConfig() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
@@ -163,4 +177,48 @@
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
return out.toByteArray();
}
+
+ @Test
+ public void testParseHeader() throws JsonProcessingException
+ {
+ String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
+ SchemaRegistryBasedAvroBytesDecoder decoder;
+ decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
+ .readerFor(AvroBytesDecoder.class)
+ .readValue(json);
+
+ Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
+
+ // Then
+ Assert.assertEquals(3, header.size());
+ Assert.assertEquals("value.1", header.get("registry.header.prop.1"));
+ Assert.assertEquals("value.2", header.get("registry.header.prop.2"));
+ Assert.assertEquals("value.3", header.get("registry.header.prop.3"));
+ }
+
+ @Test
+ public void testParseConfig() throws JsonProcessingException
+ {
+ String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
+ SchemaRegistryBasedAvroBytesDecoder decoder;
+ decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
+ .readerFor(AvroBytesDecoder.class)
+ .readValue(json);
+
+ Map<String, ?> config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
+
+ // Then
+ Assert.assertEquals(3, config.size());
+ Assert.assertEquals("value.1", config.get("registry.config.prop.1"));
+ Assert.assertEquals("value.2", config.get("registry.config.prop.2"));
+ Assert.assertEquals("value.3", config.get("registry.config.prop.3"));
+ }
}
diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
index 2d4cc8d..17bb85a 100644
--- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
+++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
@@ -19,8 +19,10 @@
package org.apache.druid.data.input.protobuf;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
@@ -29,8 +31,10 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
+import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
import javax.annotation.Nullable;
@@ -50,16 +54,19 @@
private final String url;
private final int capacity;
private final List<String> urls;
- private final Map<String, ?> config;
- private final Map<String, String> headers;
+ private final Map<String, Object> config;
+ private final Map<String, Object> headers;
+ private final ObjectMapper jsonMapper;
+ public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
@JsonCreator
public SchemaRegistryBasedProtobufBytesDecoder(
@JsonProperty("url") @Deprecated String url,
@JsonProperty("capacity") Integer capacity,
@JsonProperty("urls") @Nullable List<String> urls,
- @JsonProperty("config") @Nullable Map<String, ?> config,
- @JsonProperty("headers") @Nullable Map<String, String> headers
+ @JsonProperty("config") @Nullable Map<String, Object> config,
+ @JsonProperty("headers") @Nullable Map<String, Object> headers,
+ @JacksonInject @Json ObjectMapper jsonMapper
)
{
this.url = url;
@@ -67,10 +74,11 @@
this.urls = urls;
this.config = config;
this.headers = headers;
+ this.jsonMapper = jsonMapper;
if (url != null && !url.isEmpty()) {
- this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers);
+ this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
} else {
- this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers);
+ this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
}
}
@@ -93,13 +101,13 @@
}
@JsonProperty
- public Map<String, ?> getConfig()
+ public Map<String, Object> getConfig()
{
return config;
}
@JsonProperty
- public Map<String, String> getHeaders()
+ public Map<String, Object> getHeaders()
{
return headers;
}
@@ -119,6 +127,7 @@
this.config = null;
this.headers = null;
this.registry = registry;
+ this.jsonMapper = new ObjectMapper();
}
@Override
diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
index e9e15ff..0a3e8c7 100644
--- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
+++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.data.input.protobuf;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
@@ -77,6 +78,9 @@
for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
+ jsonMapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
}
@Test
@@ -99,7 +103,7 @@
{
ProtobufInputFormat inputFormat = new ProtobufInputFormat(
flattenSpec,
- new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null)
+ new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null, null)
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(inputFormat),
diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
index 0d77b11..009b5a6 100644
--- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
+++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.data.input.protobuf;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.DynamicMessage;
@@ -26,7 +28,9 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.commons.io.IOUtils;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
@@ -40,6 +44,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.Map;
public class SchemaRegistryBasedProtobufBytesDecoderTest
{
@@ -93,7 +98,7 @@
public void testDefaultCapacity()
{
// Given
- SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null);
+ SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null, null);
// When
Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), Integer.MAX_VALUE);
}
@@ -103,7 +108,7 @@
{
int capacity = 100;
// Given
- SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null);
+ SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null, null);
// When
Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), capacity);
}
@@ -120,7 +125,10 @@
public void testMultipleUrls() throws Exception
{
String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
@@ -134,7 +142,10 @@
public void testUrl() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
@@ -148,7 +159,10 @@
public void testConfig() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
@@ -158,6 +172,50 @@
Assert.assertNotEquals(decoder.hashCode(), 0);
}
+ @Test
+ public void testParseHeader() throws JsonProcessingException
+ {
+ String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
+ SchemaRegistryBasedProtobufBytesDecoder decoder;
+ decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
+ .readerFor(ProtobufBytesDecoder.class)
+ .readValue(json);
+
+ Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
+
+ // Then
+ Assert.assertEquals(3, header.size());
+ Assert.assertEquals("value.1", header.get("registry.header.prop.1"));
+ Assert.assertEquals("value.2", header.get("registry.header.prop.2"));
+ Assert.assertEquals("value.3", header.get("registry.header.prop.3"));
+ }
+
+ @Test
+ public void testParseConfig() throws JsonProcessingException
+ {
+ String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+ );
+ SchemaRegistryBasedProtobufBytesDecoder decoder;
+ decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
+ .readerFor(ProtobufBytesDecoder.class)
+ .readValue(json);
+
+ Map<String, ?> heaeder = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(decoder.getConfig(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
+
+ // Then
+ Assert.assertEquals(3, heaeder.size());
+ Assert.assertEquals("value.1", heaeder.get("registry.config.prop.1"));
+ Assert.assertEquals("value.2", heaeder.get("registry.config.prop.2"));
+ Assert.assertEquals("value.3", heaeder.get("registry.config.prop.3"));
+ }
+
private ProtobufSchema parseProtobufSchema() throws IOException
{
// Given