blob: 9bd8acfa8f007e7aa618827fe3555001487f8251 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.samza.system.kafka
import java.time.Duration
import java.util.Collections
import java.util.concurrent.TimeUnit
import com.google.common.collect.ImmutableSet
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
import org.apache.kafka.clients.consumer.{KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.security.JaasUtils
import org.apache.samza.Partition
import org.apache.samza.config._
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition}
import org.junit.Assert._
import org.junit._
import scala.collection.JavaConverters
import scala.collection.JavaConverters._
/**
* README: New tests should be added to the Java tests. See TestKafkaSystemAdminJava
*/
object TestKafkaSystemAdmin extends KafkaServerTestHarness {
val SYSTEM = "kafka"
val TOPIC = "input"
val TOPIC2 = "input2"
val TOTAL_PARTITIONS = 50
val REPLICATION_FACTOR = 2
val zkSecure = JaasUtils.isZkSecurityEnabled()
val KAFKA_CONSUMER_PROPERTY_PREFIX: String = "systems." + SYSTEM + ".consumer."
val KAFKA_PRODUCER_PROPERTY_PREFIX: String = "systems." + SYSTEM + ".producer."
protected def numBrokers: Int = 3
var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
var producerConfig: KafkaProducerConfig = null
var systemAdmin: KafkaSystemAdmin = null
var adminClient: AdminClient = null
override def generateConfigs(): Seq[KafkaConfig] = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableDeleteTopic = true)
props.map(KafkaConfig.fromProps)
}
@BeforeClass
override def setUp() {
super.setUp()
val map = new java.util.HashMap[String, String]()
map.put("bootstrap.servers", brokerList)
map.put(KAFKA_CONSUMER_PROPERTY_PREFIX +
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
map.put("acks", "all")
map.put("serializer.class", "kafka.serializer.StringEncoder")
producerConfig = new KafkaProducerConfig("kafka", "i001", map)
producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
systemAdmin = createSystemAdmin(SYSTEM, map)
adminClient = AdminClient.create(map.asInstanceOf[java.util.Map[String, Object]])
systemAdmin.start()
}
@AfterClass
override def tearDown() {
adminClient.close()
systemAdmin.stop()
producer.close()
super.tearDown()
}
def createTopic(topicName: String, partitionCount: Int) {
val topicCreationFutures = adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, partitionCount, REPLICATION_FACTOR.shortValue())))
topicCreationFutures.all().get() // wait on the future to make sure create topic call finishes
}
def validateTopic(topic: String, expectedPartitionCount: Int) {
var done = false
var attempts = 0
while (!done && attempts < 10) {
try {
attempts += 1
val topicDescriptionFutures = adminClient.describeTopics(
JavaConverters.asJavaCollectionConverter(Set(topic)).asJavaCollection).all()
val topicDescription = topicDescriptionFutures.get(500, TimeUnit.MILLISECONDS)
.get(topic)
done = expectedPartitionCount == topicDescription.partitions().size()
} catch {
case e: Exception =>
System.err.println("Interrupted during validating test topics", e)
}
}
if (!done) {
fail("Unable to successfully create topics. Tried to validate %s times." format attempts)
}
}
def getKafkaConsumer(): KafkaConsumer[Any, Any] = {
val props = new java.util.Properties
props.put("bootstrap.servers", brokerList)
props.put("group.id", "test")
props.put("auto.offset.reset", "earliest")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
new KafkaConsumer[Any, Any](props)
}
def createSystemAdmin(system: String, map: java.util.Map[String, String]) = {
// required configs - boostraplist, zkconnect and jobname
map.put(KAFKA_CONSUMER_PROPERTY_PREFIX + org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerList)
map.put(KAFKA_PRODUCER_PROPERTY_PREFIX + org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerList)
map.put(JobConfig.JOB_NAME, "job.name")
map.put(KAFKA_CONSUMER_PROPERTY_PREFIX + KafkaConsumerConfig.ZOOKEEPER_CONNECT, zkConnect)
val config: Config = new MapConfig(map)
val clientId = KafkaConsumerConfig.createClientId("clientPrefix", config)
// extract kafka client configs
val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, system, clientId)
new KafkaSystemAdmin(system, config, KafkaSystemConsumer.createKafkaConsumerImpl(system, consumerConfig))
}
}
/**
* Test creates a local ZK and Kafka cluster, and uses it to create and test
* topics for to verify that offset APIs in SystemAdmin work as expected.
*/
class TestKafkaSystemAdmin {
import TestKafkaSystemAdmin._
@Test
def testShouldGetOldestNewestAndNextOffsets {
// Reduced the partition count to 25 instead of 50 to prevent tests from failing. In the current setup,
// embedded Kafka brings up 3 brokers for test and replication fetcher runs into issue with higher number partition.
// Create an empty topic with 25 partitions, but with no offsets.
createTopic(TOPIC, 25)
validateTopic(TOPIC, 25)
// Verify the empty topic behaves as expected.
var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
assertEquals(1, metadata.size)
assertNotNull(metadata.get(TOPIC))
// Verify partition count.
var sspMetadata = metadata.get(TOPIC).getSystemStreamPartitionMetadata
assertEquals(25, sspMetadata.size)
// Empty topics should have null for latest offset and 0 for earliest offset
assertEquals("0", sspMetadata.get(new Partition(0)).getOldestOffset)
assertNull(sspMetadata.get(new Partition(0)).getNewestOffset)
// Empty Kafka topics should have a next offset of 0.
assertEquals("0", sspMetadata.get(new Partition(0)).getUpcomingOffset)
// Add a new message to one of the partitions, and verify that it works as
// expected.
producer.send(new ProducerRecord(TOPIC, 24, "key1".getBytes, "val1".getBytes)).get()
metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
assertEquals(1, metadata.size)
val streamName = metadata.keySet.asScala.head
assertEquals(TOPIC, streamName)
sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
// key1 gets hash-mod'd to partition 48.
assertEquals("0", sspMetadata.get(new Partition(24)).getOldestOffset)
assertEquals("0", sspMetadata.get(new Partition(24)).getNewestOffset)
assertEquals("1", sspMetadata.get(new Partition(24)).getUpcomingOffset)
// Some other partition should be empty.
assertEquals("0", sspMetadata.get(new Partition(3)).getOldestOffset)
assertNull(sspMetadata.get(new Partition(3)).getNewestOffset)
assertEquals("0", sspMetadata.get(new Partition(3)).getUpcomingOffset)
// Add a second message to one of the same partition.
producer.send(new ProducerRecord(TOPIC, 24, "key1".getBytes, "val2".getBytes)).get()
metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
assertEquals(1, metadata.size)
assertEquals(TOPIC, streamName)
sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
// key1 gets hash-mod'd to partition 48.
assertEquals("0", sspMetadata.get(new Partition(24)).getOldestOffset)
assertEquals("1", sspMetadata.get(new Partition(24)).getNewestOffset)
assertEquals("2", sspMetadata.get(new Partition(24)).getUpcomingOffset)
// Validate that a fetch will return the message.
val consumer = getKafkaConsumer
consumer.subscribe(JavaConverters.setAsJavaSetConverter(Set(TOPIC)).asJava)
val records = consumer.poll(Duration.ofMillis(10000))
consumer.close()
assertTrue(records.count() == 2)
val stream = records.iterator()
var message = stream.next()
var text = new String(message.value().asInstanceOf[Array[Byte]], "UTF-8")
// First message should match the earliest expected offset.
assertEquals(sspMetadata.get(new Partition(24)).getOldestOffset, message.offset.toString)
assertEquals("val1", text)
// Second message should match the earliest expected offset.
message = stream.next()
text = new String(message.value().asInstanceOf[Array[Byte]], "UTF-8")
assertEquals(sspMetadata.get(new Partition(24)).getNewestOffset, message.offset.toString)
assertEquals("val2", text)
}
@Test
def testNonExistentTopic {
val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic").asJava)
val metadata = initialOffsets.asScala.getOrElse("non-existent-topic", fail("missing metadata"))
assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0")).asJava))
}
@Test
def testOffsetsAfter {
val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
ssp1 -> "1",
ssp2 -> "2").asJava)
assertEquals("2", offsetsAfter.get(ssp1))
assertEquals("3", offsetsAfter.get(ssp2))
}
@Test
def testShouldCreateCoordinatorStream {
val topic = "test-coordinator-stream"
val expectedReplicationFactor = 3
val map = new java.util.HashMap[String, String]()
map.put(org.apache.samza.config.KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, expectedReplicationFactor.toString)
val systemAdmin = createSystemAdmin(SYSTEM, map)
val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
val actualReplicationFactor = systemAdmin.toKafkaSpec(spec).getReplicationFactor
// Ensure we respect the replication factor passed to system admin
assertEquals(expectedReplicationFactor, actualReplicationFactor)
systemAdmin.createStream(spec)
validateTopic(topic, 1)
}
@Test
def testShouldCreateCheckpointStream {
val topic = "test-checkpoint-stream"
val expectedReplicationFactor = 1
val map = new java.util.HashMap[String, String]()
map.put(org.apache.samza.config.KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, expectedReplicationFactor.toString)
val systemAdmin = createSystemAdmin(SYSTEM, map)
val spec = StreamSpec.createCheckpointStreamSpec(topic, "kafka")
val actualReplicationFactor = systemAdmin.toKafkaSpec(spec).getReplicationFactor
// Ensure we respect the replication factor passed to system admin
assertEquals(expectedReplicationFactor, actualReplicationFactor)
systemAdmin.createStream(spec)
validateTopic(topic, 1)
}
@Test
def testGetNewestOffset {
createTopic(TOPIC2, 16)
validateTopic(TOPIC2, 16)
val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4))
val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13))
assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
// Add a new message to one of the partitions, and verify that it works as expected.
assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString)
assertEquals("0", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
// Again
assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString)
assertEquals("1", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
// Add a message to both partitions
assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString)
assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString)
assertEquals("2", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
assertEquals("0", systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
}
}