blob: 721cf59d7ce34b3097cec40eca7995523828223d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.tests.util.kafka;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.kafka.containers.SchemaRegistryContainer;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
/** End-to-end test for SQL client using Avro Confluent Registry format. */
public class SQLClientSchemaRegistryITCase {
private static final Logger LOG = LoggerFactory.getLogger(SQLClientSchemaRegistryITCase.class);
private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry";
private static final Path sqlAvroJar = ResourceTestUtils.getResource(".*avro.jar");
private static final Path sqlAvroRegistryJar =
ResourceTestUtils.getResource(".*avro-confluent.jar");
private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.jar");
private final Path guavaJar = ResourceTestUtils.getResource(".*guava.jar");
@ClassRule public static final Network NETWORK = Network.newNetwork();
@ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES);
@ClassRule
public static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
.withLogConsumer(LOG_CONSUMER);
@ClassRule
public static final SchemaRegistryContainer REGISTRY =
new SchemaRegistryContainer(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY))
.withKafka(INTER_CONTAINER_KAFKA_ALIAS + ":9092")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_REGISTRY_ALIAS)
.dependsOn(KAFKA);
public final TestcontainersSettings testcontainersSettings =
TestcontainersSettings.builder().network(NETWORK).logger(LOG).dependsOn(KAFKA).build();
public final FlinkContainers flink =
FlinkContainers.builder().withTestcontainersSettings(testcontainersSettings).build();
private KafkaContainerClient kafkaClient;
private CachedSchemaRegistryClient registryClient;
@Before
public void setUp() throws Exception {
flink.start();
kafkaClient = new KafkaContainerClient(KAFKA);
registryClient = new CachedSchemaRegistryClient(REGISTRY.getSchemaRegistryUrl(), 10);
}
@After
public void tearDown() {
flink.stop();
}
@Test
public void testReading() throws Exception {
String testCategoryTopic = "test-category-" + UUID.randomUUID().toString();
String testResultsTopic = "test-results-" + UUID.randomUUID().toString();
kafkaClient.createTopic(1, 1, testCategoryTopic);
Schema categoryRecord =
SchemaBuilder.record("org.apache.flink.avro.generated.record")
.fields()
.requiredLong("category_id")
.optionalString("name")
.endRecord();
String categorySubject = testCategoryTopic + "-value";
registryClient.register(categorySubject, new AvroSchema(categoryRecord));
GenericRecordBuilder categoryBuilder = new GenericRecordBuilder(categoryRecord);
KafkaAvroSerializer valueSerializer = new KafkaAvroSerializer(registryClient);
kafkaClient.sendMessages(
testCategoryTopic,
valueSerializer,
categoryBuilder.set("category_id", 1L).set("name", "electronics").build());
List<String> sqlLines =
Arrays.asList(
"CREATE TABLE category (",
" category_id BIGINT,",
" name STRING,",
" description STRING", // new field, should create new schema version, but
// still should
// be able to read old version
") WITH (",
" 'connector' = 'kafka',",
" 'properties.bootstrap.servers' = '"
+ INTER_CONTAINER_KAFKA_ALIAS
+ ":9092',",
" 'topic' = '" + testCategoryTopic + "',",
" 'scan.startup.mode' = 'earliest-offset',",
" 'properties.group.id' = 'test-group',",
" 'format' = 'avro-confluent',",
" 'avro-confluent.url' = 'http://"
+ INTER_CONTAINER_REGISTRY_ALIAS
+ ":8082'",
");",
"",
"CREATE TABLE results (",
" category_id BIGINT,",
" name STRING,",
" description STRING",
") WITH (",
" 'connector' = 'kafka',",
" 'properties.bootstrap.servers' = '"
+ INTER_CONTAINER_KAFKA_ALIAS
+ ":9092',",
" 'properties.group.id' = 'test-group',",
" 'topic' = '" + testResultsTopic + "',",
" 'format' = 'csv',",
" 'csv.null-literal' = 'null'",
");",
"",
"INSERT INTO results SELECT * FROM category;");
executeSqlStatements(sqlLines);
List<String> categories =
kafkaClient.readMessages(
1, "test-group", testResultsTopic, new StringDeserializer());
assertThat(categories, equalTo(Collections.singletonList("1,electronics,null")));
}
@Test
public void testWriting() throws Exception {
String testUserBehaviorTopic = "test-user-behavior-" + UUID.randomUUID().toString();
// Create topic test-avro
kafkaClient.createTopic(1, 1, testUserBehaviorTopic);
String behaviourSubject = testUserBehaviorTopic + "-value";
List<String> sqlLines =
Arrays.asList(
"CREATE TABLE user_behavior (",
" user_id BIGINT NOT NULL,",
" item_id BIGINT,",
" category_id BIGINT,",
" behavior STRING,",
" ts TIMESTAMP(3)",
") WITH (",
" 'connector' = 'kafka',",
" 'properties.bootstrap.servers' = '"
+ INTER_CONTAINER_KAFKA_ALIAS
+ ":9092',",
" 'topic' = '" + testUserBehaviorTopic + "',",
" 'format' = 'avro-confluent',",
" 'avro-confluent.url' = 'http://"
+ INTER_CONTAINER_REGISTRY_ALIAS
+ ":8082"
+ "'",
");",
"",
"INSERT INTO user_behavior VALUES (1, 1, 1, 'buy', TO_TIMESTAMP(FROM_UNIXTIME(1234)));");
executeSqlStatements(sqlLines);
List<Integer> versions = getAllVersions(behaviourSubject);
assertThat(versions.size(), equalTo(1));
List<Object> userBehaviors =
kafkaClient.readMessages(
1,
"test-group",
testUserBehaviorTopic,
new KafkaAvroDeserializer(registryClient));
String schemaString =
registryClient.getByVersion(behaviourSubject, versions.get(0), false).getSchema();
Schema userBehaviorSchema = new Schema.Parser().parse(schemaString);
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(userBehaviorSchema);
assertThat(
userBehaviors,
equalTo(
Collections.singletonList(
recordBuilder
.set("user_id", 1L)
.set("item_id", 1L)
.set("category_id", 1L)
.set("behavior", "buy")
.set("ts", 1234000L)
.build())));
}
private List<Integer> getAllVersions(String behaviourSubject) throws Exception {
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
Exception ex =
new IllegalStateException(
"Could not query schema registry. Negative deadline provided.");
while (deadline.hasTimeLeft()) {
try {
return registryClient.getAllVersions(behaviourSubject);
} catch (RestClientException e) {
ex = e;
}
}
throw ex;
}
private void executeSqlStatements(List<String> sqlLines) throws Exception {
flink.submitSQLJob(
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJars(sqlAvroJar, sqlAvroRegistryJar, sqlConnectorKafkaJar, guavaJar)
.build());
}
}