blob: fafb1bc66306ed278f40db801ae53cf53b8e71c1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.compat.kafka;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.reflect.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.testng.Assert;
import org.testng.annotations.Test;
@Slf4j
public class KafkaApiTest extends PulsarStandaloneTestSuite {
@Data
public static class Foo {
@Nullable
private String field1;
@Nullable
private String field2;
private int field3;
}
@Data
public static class Bar {
private boolean field1;
}
private static String getPlainTextServiceUrl() {
return container.getPlainTextServiceUrl();
}
private static String getHttpServiceUrl() {
return container.getHttpServiceUrl();
}
@Test(timeOut = 30000)
public void testSimpleProducerConsumer() throws Exception {
String topic = "persistent://public/default/testSimpleProducerConsumer";
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
producerProperties.put("key.serializer", IntegerSerializer.class.getName());
producerProperties.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
consumerProperties.put("group.id", "my-subscription-name");
consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName());
consumerProperties.put("value.deserializer", StringDeserializer.class.getName());
consumerProperties.put("enable.auto.commit", "true");
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Arrays.asList(topic));
List<Long> offsets = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RecordMetadata md = producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i)).get();
offsets.add(md.offset());
log.info("Published message at {}", Long.toHexString(md.offset()));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
records.forEach(record -> {
assertEquals(record.key().intValue(), received.get());
assertEquals(record.value(), "hello-" + received.get());
assertEquals(record.offset(), offsets.get(received.get()).longValue());
received.incrementAndGet();
});
consumer.commitSync();
}
consumer.close();
}
@Test
public void testSimpleConsumer() throws Exception {
String topic = "testSimpleConsumer";
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
@Cleanup
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
}
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
String key = Integer.toString(received.get());
String value = "hello-" + received.get();
log.info("Receive record : key = {}, value = {}, topic = {}, ptn = {}",
key, value, record.topic(), record.partition());
assertEquals(record.key(), key);
assertEquals(record.value(), value);
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
@Test
public void testConsumerAutoCommit() throws Exception {
String topic = "testConsumerAutoCommit";
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
}
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> {
assertEquals(record.key(), Integer.toString(received.get()));
assertEquals(record.value(), "hello-" + received.get());
received.incrementAndGet();
});
}
consumer.close();
// Re-open consumer and verify every message was acknowledged
Consumer<String, String> consumer2 = new KafkaConsumer<>(props);
consumer2.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = consumer2.poll(100);
assertEquals(records.count(), 0);
consumer2.close();
}
@Test
public void testConsumerManualOffsetCommit() throws Exception {
String topic = "testConsumerManualOffsetCommit";
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
}
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> {
assertEquals(record.key(), Integer.toString(received.get()));
assertEquals(record.value(), "hello-" + received.get());
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
consumer.commitSync(offsets);
received.incrementAndGet();
});
}
consumer.close();
// Re-open consumer and verify every message was acknowledged
Consumer<String, String> consumer2 = new KafkaConsumer<>(props);
consumer2.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = consumer2.poll(100);
assertEquals(records.count(), 0);
consumer2.close();
}
@Test
public void testPartitions() throws Exception {
String topic = "testPartitions";
// Create 8 partitions in topic
@Cleanup
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
admin.topics().createPartitionedTopic(topic, 8);
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic)
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition).create();
// Create 2 Kakfa consumer and verify each gets half of the messages
List<Consumer<String, String>> consumers = new ArrayList<>();
for (int c = 0; c < 2; c++) {
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
consumers.add(consumer);
}
int N = 8 * 3;
for (int i = 0; i < N; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
}
consumers.forEach(consumer -> {
int expectedMessaged = N / consumers.size();
for (int i = 0; i < expectedMessaged;) {
ConsumerRecords<String, String> records = consumer.poll(100);
i += records.count();
}
// No more messages for this consumer
ConsumerRecords<String, String> records = consumer.poll(100);
assertEquals(records.count(), 0);
});
consumers.forEach(Consumer::close);
}
@Test
public void testExplicitPartitions() throws Exception {
String topic = "testExplicitPartitions";
// Create 8 partitions in topic
@Cleanup
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
admin.topics().createPartitionedTopic(topic, 8);
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
producerProperties.put("key.serializer", IntegerSerializer.class.getName());
producerProperties.put("value.serializer", StringSerializer.class.getName());
@Cleanup
Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
// Create Kakfa consumer and verify all messages came from intended partition
@Cleanup
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
int N = 8 * 3;
final int choosenPartition = 5;
for (int i = 0; i < N; i++) {
producer.send(new ProducerRecord<>(topic, choosenPartition, i, "hello-" + i));
}
producer.flush();
for (int i = 0; i < N;) {
ConsumerRecords<String, String> records = consumer.poll(100);
i += records.count();
records.forEach(record -> {
assertEquals(record.partition(), choosenPartition);
});
}
// No more messages for this consumer
ConsumerRecords<String, String> records = consumer.poll(100);
assertEquals(records.count(), 0);
}
public static class MyCustomPartitioner implements Partitioner {
static int USED_PARTITION = 3;
@Override
public void configure(Map<String, ?> conf) {
// Do nothing
}
@Override
public void close() {
// Do nothing
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// Dummy implementation that always return same partition
return USED_PARTITION;
}
}
@Test
public void testCustomRouter() throws Exception {
String topic = "testCustomRouter";
// Create 8 partitions in topic
@Cleanup
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
admin.topics().createPartitionedTopic(topic, 8);
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
producerProperties.put("key.serializer", IntegerSerializer.class.getName());
producerProperties.put("value.serializer", StringSerializer.class.getName());
producerProperties.put("partitioner.class", MyCustomPartitioner.class.getName());
@Cleanup
Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
// Create Kakfa consumer and verify all messages came from intended partition
@Cleanup
Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
int N = 8 * 3;
for (int i = 0; i < N; i++) {
producer.send(new ProducerRecord<>(topic, i, "hello-" + i));
}
producer.flush();
for (int i = 0; i < N;) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
i += records.count();
records.forEach(record -> {
assertEquals(record.partition(), MyCustomPartitioner.USED_PARTITION);
});
}
// No more messages for this consumer
ConsumerRecords<Integer, String> records = consumer.poll(100);
assertEquals(records.count(), 0);
}
@Test
public void testConsumerSeek() throws Exception {
String topic = "testConsumerSeek";
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("pulsar.consumer.acknowledgments.group.time.millis", "0");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
}
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> {
assertEquals(record.key(), Integer.toString(received.get()));
assertEquals(record.value(), "hello-" + received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
consumer.seekToBeginning(Collections.emptyList());
Thread.sleep(500);
// Messages should be available again
received.set(0);
while (received.get() < 10) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> {
assertEquals(record.key(), Integer.toString(received.get()));
assertEquals(record.value(), "hello-" + received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
consumer.close();
}
@Test
public void testConsumerSeekToEnd() throws Exception {
String topic = "testConsumerSeekToEnd";
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("pulsar.consumer.acknowledgments.group.time.millis", "0");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
}
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> {
assertEquals(record.key(), Integer.toString(received.get()));
assertEquals(record.value(), "hello-" + received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
consumer.seekToEnd(Collections.emptyList());
Thread.sleep(500);
consumer.close();
// Recreate the consumer
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = consumer.poll(100);
// Since we are at the end of the topic, there should be no messages
assertEquals(records.count(), 0);
consumer.close();
}
@Test
public void testSimpleProducer() throws Exception {
String topic = "testSimpleProducer";
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
}
producer.flush();
producer.close();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "hello-" + i);
pulsarConsumer.acknowledge(msg);
}
}
@Test(timeOut = 10000)
public void testProducerCallback() throws Exception {
String topic = "testProducerCallback";
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
CountDownLatch counter = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i), (metadata, exception) -> {
assertEquals(metadata.topic(), topic);
assertNull(exception);
counter.countDown();
});
}
counter.await();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "hello-" + i);
pulsarConsumer.acknowledge(msg);
}
producer.close();
}
@Test
public void testProducerAvroSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerAvroSchemaWithPulsarKafkaClient";
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer =
pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
for (int i = 0; i < 10; i++) {
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<Bar, Foo>(topic, bar, foo));
}
producer.flush();
producer.close();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
Foo value = fooSchema.decode(msg.getValue());
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), i);
pulsarConsumer.acknowledge(msg);
}
}
@Test
public void testConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testConsumerAvroSchemaWithPulsarKafkaClient";
StringSchema stringSchema = new StringSchema();
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<String, Foo> consumer = new KafkaConsumer<String, Foo>(props, new StringSchema(), fooSchema);
consumer.subscribe(Arrays.asList(topic));
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<Foo> pulsarProducer = pulsarClient.newProducer(fooSchema).topic(topic).create();
for (int i = 0; i < 10; i++) {
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
pulsarProducer.newMessage().keyBytes(stringSchema.encode(Integer.toString(i))).value(foo).send();
}
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
Assert.assertEquals(record.key(), Integer.toString(received.get()));
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
@Test
public void testProducerConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerConsumerAvroSchemaWithPulsarKafkaClient";
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
consumer.subscribe(Arrays.asList(topic));
Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
for (int i = 0; i < 10; i++) {
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<>(topic, bar, foo));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<Bar, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
Bar key = record.key();
Assert.assertTrue(key.isField1());
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
@Test
public void testProducerConsumerJsonSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerConsumerJsonSchemaWithPulsarKafkaClient";
JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
consumer.subscribe(Arrays.asList(topic));
Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
for (int i = 0; i < 10; i++) {
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<>(topic, bar, foo));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<Bar, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
Bar key = record.key();
Assert.assertTrue(key.isField1());
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
@Test
public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";
Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
consumer.subscribe(Arrays.asList(topic));
Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);
for (int i = 0; i < 10; i++) {
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
String key = record.key();
Assert.assertEquals(key, "hello" + received.get());
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
}