Add integration test for protobuf (#11126)
* add file test
* test
* for test
* bug fixed
* test
* test
* test
* bug fixed
* delete auto scaler
* add input format
* add extensions
* bug fixed
* bug fixed
* bug fixed
* revert
* add schema registry test
* bug fixed
* bug fixed
* delete desc
* delete change
* add desc
* bug fixed
* test inputformat
* bug fixed
* bug fixed
* bug fixed
* bug fixed
* delete io exception
* change builder not static
* change pom
* bug fixed
Co-authored-by: yuanyi <yuanyi@freewheel.tv>
diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile
index f56ff76..b3af96b 100644
--- a/integration-tests/docker/Dockerfile
+++ b/integration-tests/docker/Dockerfile
@@ -26,6 +26,7 @@
FROM druidbase
ARG MYSQL_VERSION
+ARG CONFLUENT_VERSION
# Verify Java version
RUN java -version
@@ -46,6 +47,9 @@
RUN wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \
-O /usr/local/druid/lib/mysql-connector-java.jar
+RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \
+ -O /usr/local/druid/lib/kafka-protobuf-provider.jar
+
# Add sample data
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
diff --git a/integration-tests/docker/test-data/wikipedia.desc b/integration-tests/docker/test-data/wikipedia.desc
new file mode 100644
index 0000000..d1a95df
--- /dev/null
+++ b/integration-tests/docker/test-data/wikipedia.desc
Binary files differ
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 8f3f088..edabb27 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -122,6 +122,12 @@
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
+ <artifactId>druid-protobuf-extensions</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-s3-extensions</artifactId>
<version>${project.parent.version}</version>
<scope>runtime</scope>
@@ -366,6 +372,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-protobuf-provider</artifactId>
+ <version>5.5.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.11.0</version>
+ </dependency>
<!-- Tests -->
<dependency>
@@ -387,6 +404,12 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protobuf-dynamic</artifactId>
+ <version>0.9.3</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
@@ -470,6 +493,7 @@
<DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>
<DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER>
<MYSQL_VERSION>${mysql.version}</MYSQL_VERSION>
+ <CONFLUENT_VERSION>5.5.1</CONFLUENT_VERSION>
<KAFKA_VERSION>${apache.kafka.version}</KAFKA_VERSION>
<ZK_VERSION>${zk.version}</ZK_VERSION>
</environmentVariables>
diff --git a/integration-tests/script/copy_resources.sh b/integration-tests/script/copy_resources.sh
index 6495dad..4dd1f0a 100755
--- a/integration-tests/script/copy_resources.sh
+++ b/integration-tests/script/copy_resources.sh
@@ -80,6 +80,7 @@
mkdir -p $SHARED_DIR/wikiticker-it
cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
cp docker/wiki-simple-lookup.json $SHARED_DIR/wikiticker-it/wiki-simple-lookup.json
+cp docker/test-data/wikipedia.desc $SHARED_DIR/wikiticker-it/wikipedia.desc
# copy other files if needed
if [ -n "$DRUID_INTEGRATION_TEST_RESOURCE_FILE_DIR_PATH" ]
diff --git a/integration-tests/script/docker_build_containers.sh b/integration-tests/script/docker_build_containers.sh
index 6a3867a..0819e3a 100755
--- a/integration-tests/script/docker_build_containers.sh
+++ b/integration-tests/script/docker_build_containers.sh
@@ -22,17 +22,17 @@
if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ]
then
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version"
- docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
+ docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
else
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}"
case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in
8)
echo "Build druid-cluster with Java 8"
- docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+ docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;;
11)
echo "Build druid-cluster with Java 11"
- docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+ docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;;
*)
echo "Invalid JVM Runtime given. Stopping"
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
index cad5acf..d7de555 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
@@ -43,7 +43,9 @@
@Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
@Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
@Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class),
- @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class)
+ @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class),
+ @Type(name = ProtobufEventSerializer.TYPE, value = ProtobufEventSerializer.class),
+ @Type(name = ProtobufSchemaRegistryEventSerializer.TYPE, value = ProtobufSchemaRegistryEventSerializer.class)
})
public interface EventSerializer extends Closeable
{
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java
new file mode 100644
index 0000000..cc9b779
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.github.os72.protobuf.dynamic.MessageDefinition;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.List;
+
+public class ProtobufEventSerializer implements EventSerializer
+{
+ public static final String TYPE = "protobuf";
+
+ private static final Logger LOGGER = new Logger(ProtobufEventSerializer.class);
+
+ public static final DynamicSchema SCHEMA;
+
+ static {
+ DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
+ MessageDefinition wikiDef = MessageDefinition.newBuilder("Wikipedia")
+ .addField("optional", "string", "timestamp", 1)
+ .addField("optional", "string", "page", 2)
+ .addField("optional", "string", "language", 3)
+ .addField("optional", "string", "user", 4)
+ .addField("optional", "string", "unpatrolled", 5)
+ .addField("optional", "string", "newPage", 6)
+ .addField("optional", "string", "robot", 7)
+ .addField("optional", "string", "anonymous", 8)
+ .addField("optional", "string", "namespace", 9)
+ .addField("optional", "string", "continent", 10)
+ .addField("optional", "string", "country", 11)
+ .addField("optional", "string", "region", 12)
+ .addField("optional", "string", "city", 13)
+ .addField("optional", "int32", "added", 14)
+ .addField("optional", "int32", "deleted", 15)
+ .addField("optional", "int32", "delta", 16)
+ .build();
+ schemaBuilder.addMessageDefinition(wikiDef);
+ DynamicSchema schema = null;
+ try {
+ schema = schemaBuilder.build();
+ }
+ catch (Descriptors.DescriptorValidationException e) {
+ LOGGER.error("Could not init protobuf schema.");
+ }
+ SCHEMA = schema;
+ }
+
+ @Override
+ public byte[] serialize(List<Pair<String, Object>> event)
+ {
+ DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
+ Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
+ for (Pair<String, Object> pair : event) {
+ builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
+ }
+ return builder.build().toByteArray();
+ }
+
+ @Override
+ public void close()
+ {
+ }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java
new file mode 100644
index 0000000..8d80b22
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.IntegrationTestingConfig;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class ProtobufSchemaRegistryEventSerializer extends ProtobufEventSerializer
+{
+ private static final int MAX_INITIALIZE_RETRIES = 10;
+ public static final String TYPE = "protobuf-schema-registry";
+
+ private final IntegrationTestingConfig config;
+ private final CachedSchemaRegistryClient client;
+ private int schemaId = -1;
+
+
+ @JsonCreator
+ public ProtobufSchemaRegistryEventSerializer(
+ @JacksonInject IntegrationTestingConfig config
+ )
+ {
+ this.config = config;
+ this.client = new CachedSchemaRegistryClient(
+ StringUtils.format("http://%s", config.getSchemaRegistryHost()),
+ Integer.MAX_VALUE,
+ ImmutableMap.of(
+ "basic.auth.credentials.source", "USER_INFO",
+ "basic.auth.user.info", "druid:diurd"
+ ),
+ ImmutableMap.of()
+ );
+
+ }
+
+ @Override
+ public void initialize(String topic)
+ {
+ try {
+ RetryUtils.retry(
+ () -> {
+ schemaId = client.register(topic, new ProtobufSchema(ProtobufEventSerializer.SCHEMA.newMessageBuilder("Wikipedia").getDescriptorForType()));
+ return 0;
+ },
+ (e) -> true,
+ MAX_INITIALIZE_RETRIES
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public byte[] serialize(List<Pair<String, Object>> event)
+ {
+ DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
+ Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
+ for (Pair<String, Object> pair : event) {
+ builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
+ }
+ byte[] bytes = builder.build().toByteArray();
+ ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(schemaId).put((byte) 0).put(bytes);
+ bb.rewind();
+ return bb.array();
+ }
+}
diff --git a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json
new file mode 100644
index 0000000..17a9142
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json
@@ -0,0 +1,12 @@
+{
+ "type": "protobuf",
+ "protoBytesDecoder": {
+ "type": "file",
+ "descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
+ "protoMessageType": "Wikipedia"
+ },
+ "flattenSpec": {
+ "useFieldDiscovery": true
+ },
+ "binaryAsString": false
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json
new file mode 100644
index 0000000..e55784d
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json
@@ -0,0 +1,18 @@
+{
+ "type": "protobuf",
+ "protoBytesDecoder": {
+ "type": "file",
+ "descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
+ "protoMessageType": "Wikipedia"
+ },
+ "parseSpec": {
+ "format": "json",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json
new file mode 100644
index 0000000..12d8e57
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+ "type": "protobuf"
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json
new file mode 100644
index 0000000..a0e5993
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json
@@ -0,0 +1,15 @@
+{
+ "type": "protobuf",
+ "protoBytesDecoder": {
+ "type": "schema_registry",
+ "url": "%%SCHEMA_REGISTRY_HOST%%",
+ "config": {
+ "basic.auth.credentials.source": "USER_INFO",
+ "basic.auth.user.info": "druid:diurd"
+ }
+ },
+ "flattenSpec": {
+ "useFieldDiscovery": true
+ },
+ "binaryAsString": false
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json
new file mode 100644
index 0000000..2db6947
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json
@@ -0,0 +1,21 @@
+{
+ "type": "protobuf",
+ "protoBytesDecoder" : {
+ "type": "schema_registry",
+ "url": "%%SCHEMA_REGISTRY_HOST%%",
+ "config": {
+ "basic.auth.credentials.source": "USER_INFO",
+ "basic.auth.user.info": "druid:diurd"
+ }
+ },
+ "parseSpec": {
+ "format": "json",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json
new file mode 100644
index 0000000..9f1ed6a
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+ "type": "protobuf-schema-registry"
+}
\ No newline at end of file