Add DynamicConfigProvider for Schema Registry (#11362)

* add_DynamicConfigProvider_for_schema_registry

* bug fixed

* add document

* fix document

* fix spot bug

* fix document

* inject ObjectMapper

* add DynamicConfigProviderUtils

* add UT

* bug fixed

Co-authored-by: yuanyi <yuanyi@freewheel.tv>
diff --git a/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
new file mode 100644
index 0000000..4c45262
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.metadata.DynamicConfigProvider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DynamicConfigProviderUtils
+{
+  public static Map<String, String> extraConfigAndSetStringMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
+  {
+    HashMap<String, String> newConfig = new HashMap<>();
+    if (config != null) {
+      for (Map.Entry<String, Object> entry : config.entrySet()) {
+        if (!dynamicConfigProviderKey.equals(entry.getKey())) {
+          newConfig.put(entry.getKey(), entry.getValue().toString());
+        }
+      }
+      Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
+      for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
+        newConfig.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return newConfig;
+  }
+
+  public static Map<String, Object> extraConfigAndSetObjectMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
+  {
+    HashMap<String, Object> newConfig = new HashMap<>();
+    if (config != null) {
+      for (Map.Entry<String, Object> entry : config.entrySet()) {
+        if (!dynamicConfigProviderKey.equals(entry.getKey())) {
+          newConfig.put(entry.getKey(), entry.getValue());
+        }
+      }
+      Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
+      for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
+        newConfig.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return newConfig;
+  }
+
+  private static Map<String, String> extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper)
+  {
+    if (dynamicConfigProviderJson != null) {
+      DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
+      return dynamicConfigProvider.getConfig();
+    }
+    return Collections.emptyMap();
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java
new file mode 100644
index 0000000..496acfa
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.metadata.DynamicConfigProvider;
+import org.apache.druid.metadata.MapStringDynamicConfigProvider;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import java.util.Map;
+
+@RunWith(Enclosed.class)
+public class DynamicConfigProviderUtilsTest
+{
+  public static class ThrowIfURLHasNotAllowedPropertiesTest
+  {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private final String DYNAMIC_CONFIG_PROVIDER = "druid.dynamic.config.provider";
+
+    @Test
+    public void testExtraConfigAndSetStringMap()
+    {
+      DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
+          ImmutableMap.of("prop2", "value2")
+      );
+
+      Map<String, Object> properties = ImmutableMap.of(
+          "prop1", "value1",
+          "prop2", "value3",
+          DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
+      );
+      Map<String, String> res = DynamicConfigProviderUtils.extraConfigAndSetStringMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER);
+
+      Assert.assertEquals(2, res.size());
+      Assert.assertEquals("value1", res.get("prop1"));
+      Assert.assertEquals("value2", res.get("prop2"));
+    }
+
+    @Test
+    public void testExtraConfigAndSetObjectMap()
+    {
+      DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
+          ImmutableMap.of("prop2", "value2")
+      );
+
+      Map<String, Object> properties = ImmutableMap.of(
+          "prop1", "value1",
+          "prop2", "value3",
+          DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
+      );
+      Map<String, Object> res = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER);
+
+      Assert.assertEquals(2, res.size());
+      Assert.assertEquals("value1", res.get("prop1").toString());
+      Assert.assertEquals("value2", res.get("prop2").toString());
+    }
+  }
+}
diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index 002d6d2..cb1b3d2 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -380,8 +380,8 @@
 | url | String | Specifies the url endpoint of the Schema Registry. | yes |
 | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
 | urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
-| config | Json | To send additional configurations, configured for Schema Registry | no |
-| headers | Json | To send headers to the Schema Registry | no |
+| config | Json | To send additional configurations, configured for Schema Registry.  This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
+| headers | Json | To send headers to the Schema Registry.  This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
 
 For a single schema registry instance, use Field `url` or `urls` for multi instances.
 
@@ -408,12 +408,20 @@
         "schema.registry.ssl.truststore.password": "<password>",
         "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
         "schema.registry.ssl.keystore.password": "<password>",
-        "schema.registry.ssl.key.password": "<password>"
+        "schema.registry.ssl.key.password": "<password>",
+        "schema.registry.ssl.key.password",
        ... 
    },
    "headers": {
        "traceID" : "b29c5de2-0db4-490b-b421",
        "timeStamp" : "1577191871865",
+       "druid.dynamic.config.provider":{
+            "type":"mapString", 
+            "config":{
+                 "registry.header.prop.1":"value.1", 
+                 "registry.header.prop.2":"value.2"
+                 }
+            }
        ...
     }
 }
@@ -1223,8 +1231,8 @@
 | url | String | Specifies the url endpoint of the Schema Registry. | yes |
 | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
 | urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
-| config | Json | To send additional configurations, configured for Schema Registry | no |
-| headers | Json | To send headers to the Schema Registry | no |
+| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md).  | no |
+| headers | Json | To send headers to the Schema Registry.  This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
 
 For a single schema registry instance, use Field `url` or `urls` for multi instances.
 
@@ -1259,6 +1267,13 @@
   "headers": {
       "traceID" : "b29c5de2-0db4-490b-b421",
       "timeStamp" : "1577191871865",
+      "druid.dynamic.config.provider":{
+           "type":"mapString", 
+           "config":{
+                "registry.header.prop.1":"value.1", 
+                "registry.header.prop.2":"value.2"
+                }
+           }
       ...
   }
 }
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 4b3da38..59cb33e 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -19,8 +19,10 @@
 
 package org.apache.druid.data.input.avro;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@@ -32,8 +34,10 @@
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -48,16 +52,19 @@
   private final String url;
   private final int capacity;
   private final List<String> urls;
-  private final Map<String, ?> config;
-  private final Map<String, String> headers;
+  private final Map<String, Object> config;
+  private final Map<String, Object> headers;
+  private final ObjectMapper jsonMapper;
+  public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
 
   @JsonCreator
   public SchemaRegistryBasedAvroBytesDecoder(
       @JsonProperty("url") @Deprecated String url,
       @JsonProperty("capacity") Integer capacity,
       @JsonProperty("urls") @Nullable List<String> urls,
-      @JsonProperty("config") @Nullable Map<String, ?> config,
-      @JsonProperty("headers") @Nullable Map<String, String> headers
+      @JsonProperty("config") @Nullable Map<String, Object> config,
+      @JsonProperty("headers") @Nullable Map<String, Object> headers,
+      @JacksonInject @Json ObjectMapper jsonMapper
   )
   {
     this.url = url;
@@ -65,10 +72,11 @@
     this.urls = urls;
     this.config = config;
     this.headers = headers;
+    this.jsonMapper = jsonMapper;
     if (url != null && !url.isEmpty()) {
-      this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, this.config, this.headers);
+      this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
     } else {
-      this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, this.config, this.headers);
+      this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
     }
   }
 
@@ -91,13 +99,13 @@
   }
 
   @JsonProperty
-  public Map<String, ?> getConfig()
+  public Map<String, Object> getConfig()
   {
     return config;
   }
 
   @JsonProperty
-  public Map<String, String> getHeaders()
+  public Map<String, Object> getHeaders()
   {
     return headers;
   }
@@ -112,6 +120,7 @@
     this.config = null;
     this.headers = null;
     this.registry = registry;
+    this.jsonMapper = new ObjectMapper();
   }
 
   @Override
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
index 4213008..4a0c2df 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.data.input;
 
+import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
@@ -108,6 +109,9 @@
     for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
       jsonMapper.registerModule(jacksonModule);
     }
+    jsonMapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
   }
 
   @Test
@@ -151,7 +155,7 @@
   {
     AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
         flattenSpec,
-        new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null),
+        new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null, null),
         false,
         false
     );
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
index 3eb6439..ec073c9 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.data.input.avro;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@@ -30,8 +32,10 @@
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.druid.data.input.AvroStreamInputRowParserTest;
 import org.apache.druid.data.input.SomeAvroDatum;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,6 +45,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 public class SchemaRegistryBasedAvroBytesDecoderTest
 {
@@ -56,7 +61,10 @@
   public void testMultipleUrls() throws Exception
   {
     String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
-    ObjectMapper mapper = new ObjectMapper();
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
     SchemaRegistryBasedAvroBytesDecoder decoder;
     decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
         .readerFor(AvroBytesDecoder.class)
@@ -70,7 +78,10 @@
   public void testUrl() throws Exception
   {
     String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
-    ObjectMapper mapper = new ObjectMapper();
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
     SchemaRegistryBasedAvroBytesDecoder decoder;
     decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
         .readerFor(AvroBytesDecoder.class)
@@ -84,7 +95,10 @@
   public void testConfig() throws Exception
   {
     String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
-    ObjectMapper mapper = new ObjectMapper();
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
     SchemaRegistryBasedAvroBytesDecoder decoder;
     decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
         .readerFor(AvroBytesDecoder.class)
@@ -163,4 +177,48 @@
     writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
     return out.toByteArray();
   }
+
+  @Test
+  public void testParseHeader() throws JsonProcessingException
+  {
+    String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
+    SchemaRegistryBasedAvroBytesDecoder decoder;
+    decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
+        .readerFor(AvroBytesDecoder.class)
+        .readValue(json);
+
+    Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
+
+    // Then
+    Assert.assertEquals(3, header.size());
+    Assert.assertEquals("value.1", header.get("registry.header.prop.1"));
+    Assert.assertEquals("value.2", header.get("registry.header.prop.2"));
+    Assert.assertEquals("value.3", header.get("registry.header.prop.3"));
+  }
+
+  @Test
+  public void testParseConfig() throws JsonProcessingException
+  {
+    String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
+    SchemaRegistryBasedAvroBytesDecoder decoder;
+    decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
+        .readerFor(AvroBytesDecoder.class)
+        .readValue(json);
+
+    Map<String, ?> config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
+
+    // Then
+    Assert.assertEquals(3, config.size());
+    Assert.assertEquals("value.1", config.get("registry.config.prop.1"));
+    Assert.assertEquals("value.2", config.get("registry.config.prop.2"));
+    Assert.assertEquals("value.3", config.get("registry.config.prop.3"));
+  }
 }
diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
index 2d4cc8d..17bb85a 100644
--- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
+++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
@@ -19,8 +19,10 @@
 
 package org.apache.druid.data.input.protobuf;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.DynamicMessage;
@@ -29,8 +31,10 @@
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
+import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
 
 import javax.annotation.Nullable;
 
@@ -50,16 +54,19 @@
   private final String url;
   private final int capacity;
   private final List<String> urls;
-  private final Map<String, ?> config;
-  private final Map<String, String> headers;
+  private final Map<String, Object> config;
+  private final Map<String, Object> headers;
+  private final ObjectMapper jsonMapper;
+  public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
 
   @JsonCreator
   public SchemaRegistryBasedProtobufBytesDecoder(
       @JsonProperty("url") @Deprecated String url,
       @JsonProperty("capacity") Integer capacity,
       @JsonProperty("urls") @Nullable List<String> urls,
-      @JsonProperty("config") @Nullable Map<String, ?> config,
-      @JsonProperty("headers") @Nullable Map<String, String> headers
+      @JsonProperty("config") @Nullable Map<String, Object> config,
+      @JsonProperty("headers") @Nullable Map<String, Object> headers,
+      @JacksonInject @Json ObjectMapper jsonMapper
   )
   {
     this.url = url;
@@ -67,10 +74,11 @@
     this.urls = urls;
     this.config = config;
     this.headers = headers;
+    this.jsonMapper = jsonMapper;
     if (url != null && !url.isEmpty()) {
-      this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers);
+      this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
     } else {
-      this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers);
+      this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
     }
   }
 
@@ -93,13 +101,13 @@
   }
 
   @JsonProperty
-  public Map<String, ?> getConfig()
+  public Map<String, Object> getConfig()
   {
     return config;
   }
 
   @JsonProperty
-  public Map<String, String> getHeaders()
+  public Map<String, Object> getHeaders()
   {
     return headers;
   }
@@ -119,6 +127,7 @@
     this.config = null;
     this.headers = null;
     this.registry = registry;
+    this.jsonMapper = new ObjectMapper();
   }
 
   @Override
diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
index e9e15ff..0a3e8c7 100644
--- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
+++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.data.input.protobuf;
 
+import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
@@ -77,6 +78,9 @@
     for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) {
       jsonMapper.registerModule(jacksonModule);
     }
+    jsonMapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
   }
 
   @Test
@@ -99,7 +103,7 @@
   {
     ProtobufInputFormat inputFormat = new ProtobufInputFormat(
         flattenSpec,
-        new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null)
+        new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null, null)
     );
     NestedInputFormat inputFormat2 = jsonMapper.readValue(
         jsonMapper.writeValueAsString(inputFormat),
diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
index 0d77b11..009b5a6 100644
--- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
+++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.data.input.protobuf;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.DynamicMessage;
@@ -26,7 +28,9 @@
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import org.apache.commons.io.IOUtils;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
 import org.joda.time.DateTime;
 import org.joda.time.chrono.ISOChronology;
 import org.junit.Assert;
@@ -40,6 +44,7 @@
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.Map;
 
 public class SchemaRegistryBasedProtobufBytesDecoderTest
 {
@@ -93,7 +98,7 @@
   public void testDefaultCapacity()
   {
     // Given
-    SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null);
+    SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null, null);
     // When
     Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), Integer.MAX_VALUE);
   }
@@ -103,7 +108,7 @@
   {
     int capacity = 100;
     // Given
-    SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null);
+    SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null, null);
     // When
     Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), capacity);
   }
@@ -120,7 +125,10 @@
   public void testMultipleUrls() throws Exception
   {
     String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
-    ObjectMapper mapper = new ObjectMapper();
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
     SchemaRegistryBasedProtobufBytesDecoder decoder;
     decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
         .readerFor(ProtobufBytesDecoder.class)
@@ -134,7 +142,10 @@
   public void testUrl() throws Exception
   {
     String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
-    ObjectMapper mapper = new ObjectMapper();
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
     SchemaRegistryBasedProtobufBytesDecoder decoder;
     decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
         .readerFor(ProtobufBytesDecoder.class)
@@ -148,7 +159,10 @@
   public void testConfig() throws Exception
   {
     String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
-    ObjectMapper mapper = new ObjectMapper();
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
     SchemaRegistryBasedProtobufBytesDecoder decoder;
     decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
         .readerFor(ProtobufBytesDecoder.class)
@@ -158,6 +172,50 @@
     Assert.assertNotEquals(decoder.hashCode(), 0);
   }
 
+  @Test
+  public void testParseHeader() throws JsonProcessingException
+  {
+    String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
+    SchemaRegistryBasedProtobufBytesDecoder decoder;
+    decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
+        .readerFor(ProtobufBytesDecoder.class)
+        .readValue(json);
+
+    Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
+
+    // Then
+    Assert.assertEquals(3, header.size());
+    Assert.assertEquals("value.1", header.get("registry.header.prop.1"));
+    Assert.assertEquals("value.2", header.get("registry.header.prop.2"));
+    Assert.assertEquals("value.3", header.get("registry.header.prop.3"));
+  }
+
+  @Test
+  public void testParseConfig() throws JsonProcessingException
+  {
+    String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
+    );
+    SchemaRegistryBasedProtobufBytesDecoder decoder;
+    decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
+        .readerFor(ProtobufBytesDecoder.class)
+        .readValue(json);
+
+    Map<String, ?> heaeder = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(decoder.getConfig(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
+
+    // Then
+    Assert.assertEquals(3, heaeder.size());
+    Assert.assertEquals("value.1", heaeder.get("registry.config.prop.1"));
+    Assert.assertEquals("value.2", heaeder.get("registry.config.prop.2"));
+    Assert.assertEquals("value.3", heaeder.get("registry.config.prop.3"));
+  }
+
   private ProtobufSchema parseProtobufSchema() throws IOException
   {
     // Given