Add integration test for protobuf (#11126)

* add file test

* test

* for test

* bug fixed

* test

* test

* test

* bug fixed

* delete auto scaler

* add input format

* add extensions

* bug fixed

* bug fixed

* bug fixed

* revert

* add schema registry test

* bug fixed

* bug fixed

* delete desc

* delete change

* add desc

* bug fixed

* test inputformat

* bug fixed

* bug fixed

* bug fixed

* bug fixed

* delete io exception

* change builder not static

* change pom

* bug fixed

Co-authored-by: yuanyi <yuanyi@freewheel.tv>
diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile
index f56ff76..b3af96b 100644
--- a/integration-tests/docker/Dockerfile
+++ b/integration-tests/docker/Dockerfile
@@ -26,6 +26,7 @@
 
 FROM druidbase
 ARG MYSQL_VERSION
+ARG CONFLUENT_VERSION
 
 # Verify Java version
 RUN java -version
@@ -46,6 +47,9 @@
 RUN wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \
     -O /usr/local/druid/lib/mysql-connector-java.jar
 
+RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \
+    -O /usr/local/druid/lib/kafka-protobuf-provider.jar
+
 # Add sample data
 # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
 RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
diff --git a/integration-tests/docker/test-data/wikipedia.desc b/integration-tests/docker/test-data/wikipedia.desc
new file mode 100644
index 0000000..d1a95df
--- /dev/null
+++ b/integration-tests/docker/test-data/wikipedia.desc
Binary files differ
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 8f3f088..edabb27 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -122,6 +122,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.druid.extensions</groupId>
+            <artifactId>druid-protobuf-extensions</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid.extensions</groupId>
             <artifactId>druid-s3-extensions</artifactId>
             <version>${project.parent.version}</version>
             <scope>runtime</scope>
@@ -366,6 +372,17 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>io.confluent</groupId>
+            <artifactId>kafka-protobuf-provider</artifactId>
+            <version>5.5.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.11.0</version>
+        </dependency>
 
         <!-- Tests -->
         <dependency>
@@ -387,6 +404,12 @@
             <artifactId>easymock</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.os72</groupId>
+            <artifactId>protobuf-dynamic</artifactId>
+            <version>0.9.3</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -470,6 +493,7 @@
                                         <DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>
                                         <DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER>
                                         <MYSQL_VERSION>${mysql.version}</MYSQL_VERSION>
+                                        <CONFLUENT_VERSION>5.5.1</CONFLUENT_VERSION>
                                         <KAFKA_VERSION>${apache.kafka.version}</KAFKA_VERSION>
                                         <ZK_VERSION>${zk.version}</ZK_VERSION>
                                     </environmentVariables>
diff --git a/integration-tests/script/copy_resources.sh b/integration-tests/script/copy_resources.sh
index 6495dad..4dd1f0a 100755
--- a/integration-tests/script/copy_resources.sh
+++ b/integration-tests/script/copy_resources.sh
@@ -80,6 +80,7 @@
 mkdir -p $SHARED_DIR/wikiticker-it
 cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
 cp docker/wiki-simple-lookup.json $SHARED_DIR/wikiticker-it/wiki-simple-lookup.json
+cp docker/test-data/wikipedia.desc $SHARED_DIR/wikiticker-it/wikipedia.desc
 
 # copy other files if needed
 if [ -n "$DRUID_INTEGRATION_TEST_RESOURCE_FILE_DIR_PATH" ]
diff --git a/integration-tests/script/docker_build_containers.sh b/integration-tests/script/docker_build_containers.sh
index 6a3867a..0819e3a 100755
--- a/integration-tests/script/docker_build_containers.sh
+++ b/integration-tests/script/docker_build_containers.sh
@@ -22,17 +22,17 @@
 if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ]
 then
   echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version"
-  docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
+  docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
 else
   echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}"
   case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in
   8)
     echo "Build druid-cluster with Java 8"
-    docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+    docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
     ;;
   11)
     echo "Build druid-cluster with Java 11"
-    docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+    docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
     ;;
   *)
     echo "Invalid JVM Runtime given. Stopping"
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
index cad5acf..d7de555 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
@@ -43,7 +43,9 @@
     @Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
     @Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
     @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class),
-    @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class)
+    @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class),
+    @Type(name = ProtobufEventSerializer.TYPE, value = ProtobufEventSerializer.class),
+    @Type(name = ProtobufSchemaRegistryEventSerializer.TYPE, value = ProtobufSchemaRegistryEventSerializer.class)
 })
 public interface EventSerializer extends Closeable
 {
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java
new file mode 100644
index 0000000..cc9b779
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.github.os72.protobuf.dynamic.MessageDefinition;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.List;
+
+public class ProtobufEventSerializer implements EventSerializer
+{
+  public static final String TYPE = "protobuf";
+
+  private static final Logger LOGGER = new Logger(ProtobufEventSerializer.class);
+
+  public static final DynamicSchema SCHEMA;
+
+  static {
+    DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
+    MessageDefinition wikiDef = MessageDefinition.newBuilder("Wikipedia")
+        .addField("optional", "string", "timestamp", 1)
+        .addField("optional", "string", "page", 2)
+        .addField("optional", "string", "language", 3)
+        .addField("optional", "string", "user", 4)
+        .addField("optional", "string", "unpatrolled", 5)
+        .addField("optional", "string", "newPage", 6)
+        .addField("optional", "string", "robot", 7)
+        .addField("optional", "string", "anonymous", 8)
+        .addField("optional", "string", "namespace", 9)
+        .addField("optional", "string", "continent", 10)
+        .addField("optional", "string", "country", 11)
+        .addField("optional", "string", "region", 12)
+        .addField("optional", "string", "city", 13)
+        .addField("optional", "int32", "added", 14)
+        .addField("optional", "int32", "deleted", 15)
+        .addField("optional", "int32", "delta", 16)
+        .build();
+    schemaBuilder.addMessageDefinition(wikiDef);
+    DynamicSchema schema = null;
+    try {
+      schema = schemaBuilder.build();
+    }
+    catch (Descriptors.DescriptorValidationException e) {
+      LOGGER.error("Could not init protobuf schema.");
+    }
+    SCHEMA = schema;
+  }
+
+  @Override
+  public byte[] serialize(List<Pair<String, Object>> event)
+  {
+    DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
+    Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
+    for (Pair<String, Object> pair : event) {
+      builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
+    }
+    return builder.build().toByteArray();
+  }
+
+  @Override
+  public void close()
+  {
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java
new file mode 100644
index 0000000..8d80b22
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.IntegrationTestingConfig;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class ProtobufSchemaRegistryEventSerializer extends ProtobufEventSerializer
+{
+  private static final int MAX_INITIALIZE_RETRIES = 10;
+  public static final String TYPE = "protobuf-schema-registry";
+
+  private final IntegrationTestingConfig config;
+  private final CachedSchemaRegistryClient client;
+  private int schemaId = -1;
+
+
+  @JsonCreator
+  public ProtobufSchemaRegistryEventSerializer(
+      @JacksonInject IntegrationTestingConfig config
+  )
+  {
+    this.config = config;
+    this.client = new CachedSchemaRegistryClient(
+        StringUtils.format("http://%s", config.getSchemaRegistryHost()),
+        Integer.MAX_VALUE,
+        ImmutableMap.of(
+            "basic.auth.credentials.source", "USER_INFO",
+            "basic.auth.user.info", "druid:diurd"
+        ),
+        ImmutableMap.of()
+    );
+
+  }
+
+  @Override
+  public void initialize(String topic)
+  {
+    try {
+      RetryUtils.retry(
+          () -> {
+            schemaId = client.register(topic, new ProtobufSchema(ProtobufEventSerializer.SCHEMA.newMessageBuilder("Wikipedia").getDescriptorForType()));
+            return 0;
+          },
+          (e) -> true,
+          MAX_INITIALIZE_RETRIES
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public byte[] serialize(List<Pair<String, Object>> event)
+  {
+    DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
+    Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
+    for (Pair<String, Object> pair : event) {
+      builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
+    }
+    byte[] bytes = builder.build().toByteArray();
+    ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(schemaId).put((byte) 0).put(bytes);
+    bb.rewind();
+    return bb.array();
+  }
+}
diff --git a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json
new file mode 100644
index 0000000..17a9142
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json
@@ -0,0 +1,12 @@
+{
+  "type": "protobuf",
+  "protoBytesDecoder": {
+    "type": "file",
+    "descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
+    "protoMessageType": "Wikipedia"
+  },
+  "flattenSpec": {
+    "useFieldDiscovery": true
+  },
+  "binaryAsString": false
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json
new file mode 100644
index 0000000..e55784d
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json
@@ -0,0 +1,18 @@
+{
+  "type": "protobuf",
+  "protoBytesDecoder": {
+    "type": "file",
+    "descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
+    "protoMessageType": "Wikipedia"
+  },
+  "parseSpec": {
+    "format": "json",
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json
new file mode 100644
index 0000000..12d8e57
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+  "type": "protobuf"
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json
new file mode 100644
index 0000000..a0e5993
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json
@@ -0,0 +1,15 @@
+{
+  "type": "protobuf",
+  "protoBytesDecoder": {
+    "type": "schema_registry",
+    "url": "%%SCHEMA_REGISTRY_HOST%%",
+    "config": {
+      "basic.auth.credentials.source": "USER_INFO",
+      "basic.auth.user.info": "druid:diurd"
+    }
+  },
+  "flattenSpec": {
+    "useFieldDiscovery": true
+  },
+  "binaryAsString": false
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json
new file mode 100644
index 0000000..2db6947
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json
@@ -0,0 +1,21 @@
+{
+  "type": "protobuf",
+  "protoBytesDecoder" : {
+    "type": "schema_registry",
+    "url": "%%SCHEMA_REGISTRY_HOST%%",
+    "config": {
+      "basic.auth.credentials.source": "USER_INFO",
+      "basic.auth.user.info": "druid:diurd"
+    }
+  },
+  "parseSpec": {
+    "format": "json",
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json
new file mode 100644
index 0000000..9f1ed6a
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+  "type": "protobuf-schema-registry"
+}
\ No newline at end of file