blob: f62e8a3b6b6d530008bd0c1243eae1c7cb853d27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.junit.Assert._
import org.junit.After
import org.junit.Before
import org.junit.Test
import scala.collection.JavaConverters._
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.clients.producer.ProducerConfig
class TestKafkaConfig {
var props : Properties = new Properties
val SYSTEM_NAME = "kafka"
val KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."
val TEST_CLIENT_ID = "TestClientId"
val TEST_GROUP_ID = "TestGroupId"
@Before
def setupProperties() {
props = new Properties
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
props.setProperty(JobConfig.JOB_NAME, "jobName")
}
@After
def clearUpProperties(): Unit = {
props.clear()
}
@Test
def testStreamLevelFetchSizeOverride() {
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(mapConfig, SYSTEM_NAME, TEST_CLIENT_ID)
// default fetch size
assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes)
props.setProperty("systems." + SYSTEM_NAME + ".consumer.fetch.message.max.bytes", "262144")
val mapConfig1 = new MapConfig(props.asScala.asJava)
val kafkaConfig1 = new KafkaConfig(mapConfig1)
val consumerConfig1 = KafkaConsumerConfig.getKafkaSystemConsumerConfig(mapConfig1, SYSTEM_NAME, TEST_CLIENT_ID)
// shared fetch size
assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)
props.setProperty("systems." + SYSTEM_NAME + ".streams.topic1.consumer.fetch.message.max.bytes", "65536")
val mapConfig2 = new MapConfig(props.asScala.asJava)
val kafkaConfig2 = new KafkaConfig(mapConfig2)
val consumerConfig2 = kafkaConfig2.getFetchMessageMaxBytesTopics(SYSTEM_NAME)
// topic fetch size
assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024))
// default samza.fetch.threshold.bytes
val mapConfig3 = new MapConfig(props.asScala.asJava)
val kafkaConfig3 = new KafkaConfig(mapConfig3)
assertTrue(kafkaConfig3.getConsumerFetchThresholdBytes("kafka").isEmpty)
props.setProperty("systems.kafka.samza.fetch.threshold.bytes", "65536")
val mapConfig4 = new MapConfig(props.asScala.asJava)
val kafkaConfig4 = new KafkaConfig(mapConfig4)
assertEquals("65536", kafkaConfig4.getConsumerFetchThresholdBytes("kafka").get)
}
@Test
def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForInfiniteTTLStores(): Unit = {
val props = new Properties
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
props.setProperty(JobConfig.JOB_NAME, "jobName")
props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
props.setProperty("stores.test1.rocksdb.ttl.ms", "-1")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
assertEquals("compact", kafkaProperties.getProperty("cleanup.policy"))
assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes"))
assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
}
@Test
def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForLargeTTLStores(): Unit = {
val props = new Properties
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
props.setProperty(JobConfig.JOB_NAME, "jobName")
props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
// Set the RocksDB TTL to be 28 days.
props.setProperty("stores.test1.rocksdb.ttl.ms", "2419200000")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
assertEquals("delete", kafkaProperties.getProperty("cleanup.policy"))
assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
}
@Test
def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForStoresWithEmptyRocksDBTTL(): Unit = {
val props = new Properties
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
props.setProperty(JobConfig.JOB_NAME, "jobName")
props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
assertEquals("compact", kafkaProperties.getProperty("cleanup.policy"))
assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes"))
}
@Test
def testChangeLogProperties() {
props.setProperty("job.changelog.system", SYSTEM_NAME)
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
props.setProperty("stores.test2.changelog", "kafka.mychangelog2")
props.setProperty("stores.test2.changelog.max.message.bytes", "1024000")
props.setProperty("stores.test3.changelog", "otherstream")
props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
props.setProperty("stores.test4.rocksdb.ttl.ms", "3600")
props.setProperty("stores.test5.rocksdb.ttl.ms", "3600")
props.setProperty("stores.test5.changelog.kafka.retention.ms", "1000")
props.setProperty("stores.test6.rocksdb.ttl.ms", "3600")
props.setProperty("stores.test6.changelog.kafka.cleanup.policy", "compact")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact")
assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("max.message.bytes"), "1024000")
assertEquals(kafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact")
val storeToChangelog = kafkaConfig.getKafkaChangelogEnabledStores()
assertEquals("mychangelog1", storeToChangelog.getOrDefault("test1", ""))
assertEquals("mychangelog2", storeToChangelog.getOrDefault("test2", ""))
assertEquals("otherstream", storeToChangelog.getOrDefault("test3", ""))
assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"))
assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"))
assertNotNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()
assertEquals("mychangelog1", storeToChangelog1.getOrDefault("test1", ""))
assertEquals("mychangelog2", storeToChangelog1.getOrDefault("test2", ""))
assertEquals("otherstream", storeToChangelog1.getOrDefault("test3", ""))
assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("cleanup.policy"), "delete")
assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("retention.ms"), "3600")
assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("max.message.bytes"), null)
assertEquals(kafkaConfig.getChangelogKafkaProperties("test5").getProperty("cleanup.policy"), "delete")
assertEquals(kafkaConfig.getChangelogKafkaProperties("test5").getProperty("retention.ms"), "1000")
assertEquals(kafkaConfig.getChangelogKafkaProperties("test6").getProperty("cleanup.policy"), "compact")
assertNull(kafkaConfig.getChangelogKafkaProperties("test6").getProperty("retention.ms"))
props.setProperty(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name())
val batchMapConfig = new MapConfig(props.asScala.asJava)
val batchKafkaConfig = new KafkaConfig(batchMapConfig)
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("delete.retention.ms"),
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact")
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("delete.retention.ms"),
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact")
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("delete.retention.ms"),
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("max.message.bytes"),
KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
// test compaction config for transactional state
val lagOverride = String.valueOf(TimeUnit.HOURS.toMillis(6))
props.setProperty(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true")
props.setProperty("stores.test2.changelog.min.compaction.lag.ms", lagOverride)
val tsMapConfig = new MapConfig(props.asScala.asJava)
val tsKafkaConfig = new KafkaConfig(tsMapConfig)
assertEquals(String.valueOf(StorageConfig.DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS),
tsKafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
assertEquals(lagOverride,
tsKafkaConfig.getChangelogKafkaProperties("test2").getProperty("min.compaction.lag.ms"))
}
@Test
def testDefaultValuesForProducerProperties() {
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
val producerProperties = kafkaProducerConfig.getProducerProperties
assertEquals(classOf[ByteArraySerializer].getCanonicalName, producerProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
assertEquals(classOf[ByteArraySerializer].getCanonicalName, producerProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
assertEquals(kafkaProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT, producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION))
assertEquals(kafkaProducerConfig.RETRIES_DEFAULT, producerProperties.get(ProducerConfig.RETRIES_CONFIG))
}
@Test
def testMaxInFlightRequestsPerConnectionOverride() {
val expectedValue = "200"
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue)
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
val producerProperties = kafkaProducerConfig.getProducerProperties
assertEquals(expectedValue, producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION))
}
@Test
def testRetriesOverride() {
val expectedValue = "200"
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, expectedValue)
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
val producerProperties = kafkaProducerConfig.getProducerProperties
assertEquals(expectedValue, producerProperties.get(ProducerConfig.RETRIES_CONFIG))
}
@Test(expected = classOf[NumberFormatException])
def testMaxInFlightRequestsPerConnectionWrongNumberFormat() {
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
kafkaProducerConfig.getProducerProperties
}
@Test(expected = classOf[NumberFormatException])
def testRetriesWrongNumberFormat() {
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, "Samza")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
kafkaProducerConfig.getProducerProperties
}
@Test(expected = classOf[NumberFormatException])
def testLingerWrongNumberFormat() {
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.LINGER_MS_CONFIG, "Samza")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
kafkaProducerConfig.getProducerProperties
}
@Test
def testGetSystemDefaultReplicationFactor(): Unit = {
assertEquals(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR, new KafkaConfig(new MapConfig()).getSystemDefaultReplicationFactor("kafka-system",KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR))
props.setProperty("systems.kafka-system.default.stream.replication.factor", "8")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("8", kafkaConfig.getSystemDefaultReplicationFactor("kafka-system","2"))
}
@Test
def testChangeLogReplicationFactor() {
props.setProperty("stores.store-with-override.changelog", "kafka-system.changelog-topic")
props.setProperty("stores.store-with-override.changelog.replication.factor", "3")
props.setProperty("stores.default.changelog.replication.factor", "2")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("3", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
assertEquals("2", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
}
@Test
def testChangeLogReplicationFactorWithOverriddenDefault() {
props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system")
props.setProperty("stores.store-with-override.changelog", "changelog-topic")
props.setProperty("stores.store-with-override.changelog.replication.factor", "4")
// Override the "default" default value
props.setProperty("stores.default.changelog.replication.factor", "5")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("4", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
assertEquals("5", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
}
@Test
def testChangeLogReplicationFactorWithSystemOverriddenDefault() {
props.setProperty(StorageConfig.CHANGELOG_SYSTEM, "kafka-system")
props.setProperty("systems.kafka-system.default.stream.replication.factor", "8")
props.setProperty("stores.store-with-override.changelog.replication.factor", "4")
props.setProperty("stores.store-without-override.changelog", "change-for-store-without-override")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("4", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
assertEquals("8", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
}
@Test
def testCheckpointConfigs() {
val emptyConfig = new KafkaConfig(new MapConfig())
assertEquals("3", emptyConfig.getCheckpointReplicationFactor.orNull)
assertEquals(1000012, emptyConfig.getCheckpointMaxMessageBytes())
assertNull(emptyConfig.getCheckpointSystem.orNull)
props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system")
props.setProperty("task.checkpoint.replication.factor", "4")
props.setProperty("task.checkpoint.max.message.bytes", "2048000")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull)
assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull)
assertEquals(2048000, kafkaConfig.getCheckpointMaxMessageBytes())
}
@Test
def testCheckpointConfigsWithSystemDefault() {
props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309")
props.setProperty("systems.other-kafka-system.default.stream.max.message.bytes", "4096000")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("other-kafka-system", kafkaConfig.getCheckpointSystem.orNull)
assertEquals("8", kafkaConfig.getCheckpointReplicationFactor.orNull)
assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes)
assertEquals(4096000, kafkaConfig.getCheckpointMaxMessageBytes())
}
@Test
def testCheckpointConfigsWithSystemOverriddenDefault() {
// Defaults
props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
props.setProperty("systems.other-kafka-system.default.stream.max.message.bytes", "4096000")
props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309")
// Overrides
props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system")
props.setProperty(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, "4")
props.setProperty(KafkaConfig.CHECKPOINT_MAX_MESSAGE_BYTES, "8192000")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull)
assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull)
assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes)
assertEquals(8192000, kafkaConfig.getCheckpointMaxMessageBytes())
}
@Test
def testCoordinatorConfigs() {
val emptyConfig = new KafkaConfig(new MapConfig())
assertEquals("3", emptyConfig.getCoordinatorReplicationFactor)
assertEquals("1000012", emptyConfig.getCoordinatorMaxMessageByte)
assertNull(new JobConfig(new MapConfig()).getCoordinatorSystemNameOrNull)
props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system")
props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4")
props.setProperty(KafkaConfig.JOB_COORDINATOR_MAX_MESSAGE_BYTES, "1024000")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val jobConfig = new JobConfig(mapConfig)
assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor)
assertEquals("1024000", kafkaConfig.getCoordinatorMaxMessageByte)
}
@Test
def testCoordinatorConfigsWithSystemDefault() {
props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309")
props.setProperty("systems.other-kafka-system.default.stream.max.message.bytes", "2048000")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val jobConfig = new JobConfig(mapConfig)
assertEquals("other-kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
assertEquals("8", kafkaConfig.getCoordinatorReplicationFactor)
assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes)
assertEquals("2048000", kafkaConfig.getCoordinatorMaxMessageByte)
}
@Test
def testCoordinatorConfigsWithSystemOverriddenDefault() {
// Defaults
props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
props.setProperty("systems.other-kafka-system.default.stream.max.message.byte", "2048000")
props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309")
// Overrides
props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system")
props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4")
props.setProperty(KafkaConfig.JOB_COORDINATOR_MAX_MESSAGE_BYTES, "4096000")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
val jobConfig = new JobConfig(mapConfig)
assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor)
assertEquals("4096000", kafkaConfig.getCoordinatorMaxMessageByte)
assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes)
}
}