blob: 92406838f897339c5108a7634476fc623471b142 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.writer.kafka;
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageId;
import org.json.simple.JSONObject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
public class KafkaWriterTest {
@Mock
private KafkaProducer kafkaProducer;
public static final String SENSOR_TYPE = "test";
public WriterConfiguration createConfiguration(final Map<String, Object> parserConfig) {
ParserConfigurations configurations = new ParserConfigurations();
configurations.updateSensorParserConfig( SENSOR_TYPE
, new SensorParserConfig() {{
setParserConfig(parserConfig);
}}
);
return new ParserWriterConfiguration(configurations);
}
@BeforeEach
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testHappyPathGlobalConfig() throws Exception {
KafkaWriter writer = new KafkaWriter();
WriterConfiguration configuration = createConfiguration(
new HashMap<String, Object>() {{
put("kafka.brokerUrl" , "localhost:6667");
put("kafka.topic" , SENSOR_TYPE);
put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
}}
);
writer.configure(SENSOR_TYPE, configuration);
Map<String, Object> producerConfigs = writer.createProducerConfigs();
assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
assertEquals(producerConfigs.get("request.required.acks"), 1);
assertEquals(producerConfigs.get("key1"), 1);
assertEquals(producerConfigs.get("key2"), "value2");
}
@Test
public void testHappyPathGlobalConfigWithPrefix() throws Exception {
KafkaWriter writer = new KafkaWriter();
writer.withConfigPrefix("prefix");
WriterConfiguration configuration = createConfiguration(
new HashMap<String, Object>() {{
put("prefix.kafka.brokerUrl" , "localhost:6667");
put("prefix.kafka.topic" , SENSOR_TYPE);
put("prefix.kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
}}
);
writer.configure(SENSOR_TYPE, configuration);
Map<String, Object> producerConfigs = writer.createProducerConfigs();
assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
assertEquals(producerConfigs.get("request.required.acks"), 1);
assertEquals(producerConfigs.get("key1"), 1);
assertEquals(producerConfigs.get("key2"), "value2");
}
@Test
public void testTopicField_bothTopicAndFieldSpecified() throws Exception {
KafkaWriter writer = new KafkaWriter();
WriterConfiguration configuration = createConfiguration(
new HashMap<String, Object>() {{
put("kafka.brokerUrl" , "localhost:6667");
put("kafka.topic" , SENSOR_TYPE);
put("kafka.topicField" , "kafka_topic");
put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
}}
);
writer.configure(SENSOR_TYPE, configuration);
assertEquals( "metron"
, writer.getKafkaTopic(new JSONObject() {{
put("kafka_topic", "metron");
}}).get()
);
assertFalse( writer.getKafkaTopic(new JSONObject()).isPresent() );
}
@Test
public void testTopicField_onlyFieldSpecified() throws Exception {
KafkaWriter writer = new KafkaWriter();
WriterConfiguration configuration = createConfiguration(
new HashMap<String, Object>() {{
put("kafka.brokerUrl" , "localhost:6667");
put("kafka.topicField" , "kafka_topic");
put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
}}
);
writer.configure(SENSOR_TYPE, configuration);
assertEquals( "metron"
, writer.getKafkaTopic(new JSONObject() {{
put("kafka_topic", "metron");
}}).get()
);
assertFalse( writer.getKafkaTopic(new JSONObject()).isPresent() );
}
@Test
public void testTopicField_neitherSpecified() throws Exception {
KafkaWriter writer = new KafkaWriter();
WriterConfiguration configuration = createConfiguration(
new HashMap<String, Object>() {{
put("kafka.brokerUrl" , "localhost:6667");
put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
}}
);
writer.configure(SENSOR_TYPE, configuration);
assertEquals(Constants.ENRICHMENT_TOPIC
, writer.getKafkaTopic(new JSONObject() {{
put("kafka_topic", "metron");
}}).get()
);
assertTrue( writer.getKafkaTopic(new JSONObject()).isPresent() );
}
@Test
public void testWriterShouldReturnResponse() throws Exception {
KafkaWriter writer = spy(new KafkaWriter());
writer.setKafkaProducer(kafkaProducer);
List<BulkMessage<JSONObject>> messages = new ArrayList<>();
JSONObject successMessage = new JSONObject();
successMessage.put("value", "success");
JSONObject errorMessage = new JSONObject();
errorMessage.put("value", "error");
JSONObject droppedMessage = new JSONObject();
droppedMessage.put("value", "dropped");
messages.add(new BulkMessage<>("successId", successMessage));
messages.add(new BulkMessage<>("errorId", errorMessage));
messages.add(new BulkMessage<>("droppedId", droppedMessage));
doReturn(Optional.of("successTopic")).when(writer).getKafkaTopic(successMessage);
doReturn(Optional.of("errorTopic")).when(writer).getKafkaTopic(errorMessage);
doReturn(Optional.empty()).when(writer).getKafkaTopic(droppedMessage);
Future successFuture = mock(Future.class);
Future errorFuture = mock(Future.class);
ExecutionException throwable = new ExecutionException(new Exception("kafka error"));
when(kafkaProducer.send(new ProducerRecord<String, String>("errorTopic", "{\"value\":\"error\"}"))).thenReturn(errorFuture);
when(kafkaProducer.send(new ProducerRecord<String, String>("successTopic", "{\"value\":\"success\"}"))).thenReturn(successFuture);
when(errorFuture.get()).thenThrow(throwable);
BulkWriterResponse response = new BulkWriterResponse();
response.addSuccess(new MessageId("successId"));
response.addError(throwable, new MessageId("errorId"));
assertEquals(response, writer.write(SENSOR_TYPE, createConfiguration(new HashMap<>()), messages));
verify(kafkaProducer, times(1)).flush();
verify(kafkaProducer, times(1)).send(new ProducerRecord<String, String>("successTopic", "{\"value\":\"success\"}"));
verify(kafkaProducer, times(1)).send(new ProducerRecord<String, String>("errorTopic", "{\"value\":\"error\"}"));
verifyNoMoreInteractions(kafkaProducer);
}
@Test
public void testWriteShouldReturnErrorsOnFailedFlush() throws Exception {
KafkaWriter writer = spy(new KafkaWriter());
writer.setKafkaProducer(kafkaProducer);
List<BulkMessage<JSONObject>> messages = new ArrayList<>();
JSONObject message1 = new JSONObject();
message1.put("value", "message1");
JSONObject message2 = new JSONObject();
message2.put("value", "message2");
messages.add(new BulkMessage<>("messageId1", message1));
messages.add(new BulkMessage<>("messageId2", message2));
doReturn(Optional.of("topic1")).when(writer).getKafkaTopic(message1);
doReturn(Optional.of("topic2")).when(writer).getKafkaTopic(message2);
Future future1 = mock(Future.class);
Future future2 = mock(Future.class);
when(kafkaProducer.send(new ProducerRecord<String, String>("topic1", "{\"value\":\"message1\"}"))).thenReturn(future1);
when(kafkaProducer.send(new ProducerRecord<String, String>("topic2", "{\"value\":\"message2\"}"))).thenReturn(future2);
InterruptException throwable = new InterruptException("kafka flush exception");
doThrow(throwable).when(kafkaProducer).flush();
BulkWriterResponse response = new BulkWriterResponse();
response.addAllErrors(throwable, Arrays.asList(new MessageId("messageId1"), new MessageId("messageId2")));
assertEquals(response, writer.write(SENSOR_TYPE, createConfiguration(new HashMap<>()), messages));
verify(kafkaProducer, times(1)).flush();
verify(kafkaProducer, times(1)).send(new ProducerRecord<String, String>("topic1", "{\"value\":\"message1\"}"));
verify(kafkaProducer, times(1)).send(new ProducerRecord<String, String>("topic2", "{\"value\":\"message2\"}"));
verifyNoMoreInteractions(kafkaProducer);
}
}