blob: a0b2a0f828ec573d1a0d2e86bcef37a085b6cb74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kafka;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
import org.apache.camel.BindToRegistry;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.support.DefaultHeaderFilterStrategy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
private static final String TOPIC_STRINGS = "test";
private static final String TOPIC_INTERCEPTED = "test";
private static final String TOPIC_STRINGS_IN_HEADER = "testHeader";
private static final String TOPIC_BYTES = "testBytes";
private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
private static final String GROUP_BYTES = "groupStrings";
private static final String TOPIC_PROPAGATED_HEADERS = "testPropagatedHeaders";
private static KafkaConsumer<String, String> stringsConsumerConn;
private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
@EndpointInject("kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1")
private Endpoint toStrings;
@EndpointInject("kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1&partitionKey=1")
private Endpoint toStrings2;
@EndpointInject("kafka:" + TOPIC_INTERCEPTED + "?requestRequiredAcks=-1"
+ "&interceptorClasses=org.apache.camel.component.kafka.MockProducerInterceptor")
private Endpoint toStringsWithInterceptor;
@EndpointInject("mock:kafkaAck")
private MockEndpoint mockEndpoint;
@EndpointInject("kafka:" + TOPIC_BYTES + "?requestRequiredAcks=-1"
+ "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
+ "keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
private Endpoint toBytes;
@EndpointInject("kafka:" + TOPIC_PROPAGATED_HEADERS + "?requestRequiredAcks=-1")
private Endpoint toPropagatedHeaders;
@Produce("direct:startStrings")
private ProducerTemplate stringsTemplate;
@Produce("direct:startStrings2")
private ProducerTemplate stringsTemplate2;
@Produce("direct:startBytes")
private ProducerTemplate bytesTemplate;
@Produce("direct:startTraced")
private ProducerTemplate interceptedTemplate;
@Produce("direct:propagatedHeaders")
private ProducerTemplate propagatedHeadersTemplate;
@BindToRegistry("myStrategy")
private MyHeaderFilterStrategy strategy = new MyHeaderFilterStrategy();
@BindToRegistry("myHeaderSerializer")
private MyKafkaHeadersSerializer serializer = new MyKafkaHeadersSerializer();
@BeforeClass
public static void before() {
Properties stringsProps = new Properties();
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort());
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
stringsConsumerConn = new KafkaConsumer<>(stringsProps);
Properties bytesProps = new Properties();
bytesProps.putAll(stringsProps);
bytesProps.put("group.id", GROUP_BYTES);
bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
bytesConsumerConn = new KafkaConsumer<>(bytesProps);
}
@AfterClass
public static void after() {
stringsConsumerConn.close();
bytesConsumerConn.close();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:startStrings").to(toStrings).to(mockEndpoint);
from("direct:startStrings2").to(toStrings2).to(mockEndpoint);
from("direct:startBytes").to(toBytes).to(mockEndpoint);
from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint);
from("direct:propagatedHeaders").to(toPropagatedHeaders).to(mockEndpoint);
}
};
}
@Test
public void producedStringMessageIsReceivedByKafka() throws InterruptedException, IOException {
int messageInTopic = 10;
int messageInOtherTopic = 5;
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
List<Exchange> exchangeList = mockEndpoint.getExchanges();
assertEquals("Fifteen Exchanges are expected", exchangeList.size(), 15);
for (Exchange exchange : exchangeList) {
@SuppressWarnings("unchecked")
List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
assertEquals("One RecordMetadata is expected.", recordMetaData1.size(), 1);
assertTrue("Offset is positive", recordMetaData1.get(0).offset() >= 0);
assertTrue("Topic Name start with 'test'", recordMetaData1.get(0).topic().startsWith("test"));
}
}
@Test
public void producedString2MessageIsReceivedByKafka() throws InterruptedException, IOException {
int messageInTopic = 10;
int messageInOtherTopic = 5;
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
sendMessagesInRoute(messageInTopic, stringsTemplate2, "IT test message", (String[]) null);
sendMessagesInRoute(messageInOtherTopic, stringsTemplate2, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
List<Exchange> exchangeList = mockEndpoint.getExchanges();
assertEquals("Fifteen Exchanges are expected", exchangeList.size(), 15);
for (Exchange exchange : exchangeList) {
@SuppressWarnings("unchecked")
List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
assertEquals("One RecordMetadata is expected.", recordMetaData1.size(), 1);
assertTrue("Offset is positive", recordMetaData1.get(0).offset() >= 0);
assertTrue("Topic Name start with 'test'", recordMetaData1.get(0).topic().startsWith("test"));
}
}
@Test
public void producedStringMessageIsIntercepted() throws InterruptedException, IOException {
int messageInTopic = 10;
int messageInOtherTopic = 5;
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
sendMessagesInRoute(messageInTopic, interceptedTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
sendMessagesInRoute(messageInOtherTopic, interceptedTemplate, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_INTERCEPTED, TOPIC_STRINGS_IN_HEADER, messagesLatch);
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
assertEquals(messageInTopic + messageInOtherTopic, MockProducerInterceptor.recordsCaptured.size());
}
@Test
public void producedStringCollectionMessageIsReceivedByKafka() throws InterruptedException, IOException {
int messageInTopic = 10;
int messageInOtherTopic = 5;
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
List<String> msgs = new ArrayList<>();
for (int x = 0; x < messageInTopic; x++) {
msgs.add("Message " + x);
}
sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1");
msgs = new ArrayList<>();
for (int x = 0; x < messageInOtherTopic; x++) {
msgs.add("Other Message " + x);
}
sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
List<Exchange> exchangeList = mockEndpoint.getExchanges();
assertEquals("Two Exchanges are expected", exchangeList.size(), 2);
Exchange e1 = exchangeList.get(0);
@SuppressWarnings("unchecked")
List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (e1.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
assertEquals("Ten RecordMetadata is expected.", recordMetaData1.size(), 10);
for (RecordMetadata recordMeta : recordMetaData1) {
assertTrue("Offset is positive", recordMeta.offset() >= 0);
assertTrue("Topic Name start with 'test'", recordMeta.topic().startsWith("test"));
}
Exchange e2 = exchangeList.get(1);
@SuppressWarnings("unchecked")
List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>) (e2.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
assertEquals("Five RecordMetadata is expected.", recordMetaData2.size(), 5);
for (RecordMetadata recordMeta : recordMetaData2) {
assertTrue("Offset is positive", recordMeta.offset() >= 0);
assertTrue("Topic Name start with 'test'", recordMeta.topic().startsWith("test"));
}
}
@Test
public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException {
int messageInTopic = 10;
int messageInOtherTopic = 5;
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
Map<String, Object> inTopicHeaders = new HashMap<>();
inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test message".getBytes(), inTopicHeaders);
Map<String, Object> otherTopicHeaders = new HashMap<>();
otherTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER);
sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test message in other topic".getBytes(), otherTopicHeaders);
createKafkaBytesMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch);
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
List<Exchange> exchangeList = mockEndpoint.getExchanges();
assertEquals("Fifteen Exchanges are expected", exchangeList.size(), 15);
for (Exchange exchange : exchangeList) {
@SuppressWarnings("unchecked")
List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
assertEquals("One RecordMetadata is expected.", recordMetaData1.size(), 1);
assertTrue("Offset is positive", recordMetaData1.get(0).offset() >= 0);
assertTrue("Topic Name start with 'test'", recordMetaData1.get(0).topic().startsWith("test"));
}
}
@Test
public void propagatedHeaderIsReceivedByKafka() throws Exception {
String propagatedStringHeaderKey = "PROPAGATED_STRING_HEADER";
String propagatedStringHeaderValue = "propagated string header value";
String propagatedIntegerHeaderKey = "PROPAGATED_INTEGER_HEADER";
Integer propagatedIntegerHeaderValue = 54545;
String propagatedLongHeaderKey = "PROPAGATED_LONG_HEADER";
Long propagatedLongHeaderValue = 5454545454545L;
String propagatedDoubleHeaderKey = "PROPAGATED_DOUBLE_HEADER";
Double propagatedDoubleHeaderValue = 43434.545D;
String propagatedBytesHeaderKey = "PROPAGATED_BYTES_HEADER";
byte[] propagatedBytesHeaderValue = new byte[]{121, 34, 34, 54, 5, 3, 54, -34};
String propagatedBooleanHeaderKey = "PROPAGATED_BOOLEAN_HEADER";
Boolean propagatedBooleanHeaderValue = Boolean.TRUE;
Map<String, Object> camelHeaders = new HashMap<>();
camelHeaders.put(propagatedStringHeaderKey, propagatedStringHeaderValue);
camelHeaders.put(propagatedIntegerHeaderKey, propagatedIntegerHeaderValue);
camelHeaders.put(propagatedLongHeaderKey, propagatedLongHeaderValue);
camelHeaders.put(propagatedDoubleHeaderKey, propagatedDoubleHeaderValue);
camelHeaders.put(propagatedBytesHeaderKey, propagatedBytesHeaderValue);
camelHeaders.put(propagatedBooleanHeaderKey, propagatedBooleanHeaderValue);
camelHeaders.put("CustomObjectHeader", new Object());
camelHeaders.put("CustomNullObjectHeader", null);
camelHeaders.put("CamelFilteredHeader", "CamelFilteredHeader value");
CountDownLatch messagesLatch = new CountDownLatch(1);
propagatedHeadersTemplate.sendBodyAndHeaders("Some test message", camelHeaders);
List<ConsumerRecord<String, String>> records = pollForRecords(stringsConsumerConn, TOPIC_PROPAGATED_HEADERS, messagesLatch);
boolean allMessagesReceived = messagesLatch.await(10_000, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
ConsumerRecord<String, String> record = records.get(0);
Headers headers = record.headers();
assertNotNull("Kafka Headers should not be null.", headers);
// we have 6 headers
assertEquals("6 propagated header is expected.", 6, headers.toArray().length);
assertEquals("Propagated string value received", propagatedStringHeaderValue,
new String(getHeaderValue(propagatedStringHeaderKey, headers)));
assertEquals("Propagated integer value received", propagatedIntegerHeaderValue,
new Integer(ByteBuffer.wrap(getHeaderValue(propagatedIntegerHeaderKey, headers)).getInt()));
assertEquals("Propagated long value received", propagatedLongHeaderValue,
new Long(ByteBuffer.wrap(getHeaderValue(propagatedLongHeaderKey, headers)).getLong()));
assertEquals("Propagated double value received", propagatedDoubleHeaderValue,
new Double(ByteBuffer.wrap(getHeaderValue(propagatedDoubleHeaderKey, headers)).getDouble()));
assertArrayEquals("Propagated byte array value received", propagatedBytesHeaderValue, getHeaderValue(propagatedBytesHeaderKey, headers));
assertEquals("Propagated boolean value received", propagatedBooleanHeaderValue,
Boolean.valueOf(new String(getHeaderValue(propagatedBooleanHeaderKey, headers))));
}
@Test
public void headerFilterStrategyCouldBeOverridden() {
KafkaEndpoint kafkaEndpoint = context.getEndpoint(
"kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy", KafkaEndpoint.class);
assertIsInstanceOf(MyHeaderFilterStrategy.class, kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());
}
@Test
public void headerSerializerCouldBeOverridden() {
KafkaEndpoint kafkaEndpoint = context.getEndpoint(
"kafka:TOPIC_PROPAGATED_HEADERS?kafkaHeaderSerializer=#myHeaderSerializer", KafkaEndpoint.class);
assertIsInstanceOf(MyKafkaHeadersSerializer.class, kafkaEndpoint.getConfiguration().getKafkaHeaderSerializer());
}
private byte[] getHeaderValue(String headerKey, Headers headers) {
Header foundHeader = StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals(headerKey))
.findFirst()
.orElse(null);
assertNotNull("Header should be sent", foundHeader);
return foundHeader.value();
}
private List<ConsumerRecord<String, String>> pollForRecords(KafkaConsumer<String, String> consumerConn,
String topic, CountDownLatch messagesLatch) {
List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>();
consumerConn.subscribe(Collections.singletonList(topic));
new Thread(() -> {
while (messagesLatch.getCount() != 0) {
for (ConsumerRecord<String, String> record : consumerConn.poll(100)) {
consumedRecords.add(record);
messagesLatch.countDown();
}
}
}).start();
return consumedRecords;
}
private void createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn,
String topic, String topicInHeader, CountDownLatch messagesLatch) {
consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
boolean run = true;
while (run) {
ConsumerRecords<String, String> records = consumerConn.poll(100);
for (int i = 0; i < records.count(); i++) {
messagesLatch.countDown();
if (messagesLatch.getCount() == 0) {
run = false;
}
}
}
}
private void createKafkaBytesMessageConsumer(KafkaConsumer<byte[], byte[]> consumerConn, String topic,
String topicInHeader, CountDownLatch messagesLatch) {
consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
boolean run = true;
while (run) {
ConsumerRecords<byte[], byte[]> records = consumerConn.poll(100);
for (int i = 0; i < records.count(); i++) {
messagesLatch.countDown();
if (messagesLatch.getCount() == 0) {
run = false;
}
}
}
}
private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, String... headersWithValue) {
Map<String, Object> headerMap = new HashMap<>();
if (headersWithValue != null) {
for (int i = 0; i < headersWithValue.length; i = i + 2) {
headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
}
}
sendMessagesInRoute(messages, template, bodyOther, headerMap);
}
private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, Map<String, Object> headerMap) {
for (int k = 0; k < messages; k++) {
template.sendBodyAndHeaders(bodyOther, headerMap);
}
}
private static class MyHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
}
private static class MyKafkaHeadersSerializer extends DefaultKafkaHeaderSerializer {
}
}