blob: c02232bc0c65dccc0ae8aa3913731328586ed69c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.source;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.TestLogger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/** Tests for {@link KafkaSourceBuilder}. */
public class KafkaSourceBuilderTest extends TestLogger {
@Test
public void testBuildSourceWithGroupId() {
final KafkaSource<String> kafkaSource = getBasicBuilder().setGroupId("groupId").build();
// Commit on checkpoint should be enabled by default
Assertions.assertTrue(
kafkaSource
.getConfiguration()
.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
// Auto commit should be disabled by default
Assertions.assertFalse(
kafkaSource
.getConfiguration()
.get(
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
.booleanType()
.noDefaultValue()));
}
@Test
public void testBuildSourceWithoutGroupId() {
final KafkaSource<String> kafkaSource = getBasicBuilder().build();
// Commit on checkpoint and auto commit should be disabled because group.id is not specified
Assertions.assertFalse(
kafkaSource
.getConfiguration()
.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
Assertions.assertFalse(
kafkaSource
.getConfiguration()
.get(
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
.booleanType()
.noDefaultValue()));
}
@Test
public void testEnableCommitOnCheckpointWithoutGroupId() {
final IllegalStateException exception =
Assert.assertThrows(
IllegalStateException.class,
() ->
getBasicBuilder()
.setProperty(
KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
.key(),
"true")
.build());
MatcherAssert.assertThat(
exception.getMessage(),
CoreMatchers.containsString(
"Property group.id is required when offset commit is enabled"));
}
@Test
public void testEnableAutoCommitWithoutGroupId() {
final IllegalStateException exception =
Assert.assertThrows(
IllegalStateException.class,
() ->
getBasicBuilder()
.setProperty(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.build());
MatcherAssert.assertThat(
exception.getMessage(),
CoreMatchers.containsString(
"Property group.id is required when offset commit is enabled"));
}
@Test
public void testDisableOffsetCommitWithoutGroupId() {
getBasicBuilder()
.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
.build();
getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").build();
}
private KafkaSourceBuilder<String> getBasicBuilder() {
return new KafkaSourceBuilder<String>()
.setBootstrapServers("testServer")
.setTopics("topic")
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
}
}