| package org.apache.solr.crossdc.consumer; |
| |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.consumer.ConsumerRecords; |
| import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.errors.WakeupException; |
| import org.apache.solr.client.solrj.impl.CloudSolrClient; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.crossdc.common.IQueueHandler; |
| import org.apache.solr.crossdc.common.KafkaCrossDcConf; |
| import org.apache.solr.crossdc.common.KafkaMirroringSink; |
| import org.apache.solr.crossdc.common.MirroredSolrRequest; |
| import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.util.*; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.junit.Assert.*; |
| import static org.mockito.Mockito.*; |
| |
| public class KafkaCrossDcConsumerTest { |
| |
| private KafkaCrossDcConsumer kafkaCrossDcConsumer; |
| private KafkaConsumer<String, MirroredSolrRequest> kafkaConsumerMock; |
| private CloudSolrClient solrClientMock; |
| private KafkaMirroringSink kafkaMirroringSinkMock; |
| |
| private SolrMessageProcessor messageProcessorMock; |
| |
| private KafkaCrossDcConf conf; |
| |
| |
| @Before |
| public void setUp() { |
| kafkaConsumerMock = mock(KafkaConsumer.class); |
| solrClientMock = mock(CloudSolrClient.class); |
| kafkaMirroringSinkMock = mock(KafkaMirroringSink.class); |
| messageProcessorMock = mock(SolrMessageProcessor.class); |
| conf = testCrossDCConf(); |
| // Set necessary configurations |
| |
| kafkaCrossDcConsumer = |
| new KafkaCrossDcConsumer(conf, new CountDownLatch(0)) { |
| @Override |
| public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer( |
| Properties properties) { |
| return kafkaConsumerMock; |
| } |
| |
| @Override |
| public SolrMessageProcessor createSolrMessageProcessor() { |
| return messageProcessorMock; |
| } |
| |
| @Override |
| protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) { |
| return solrClientMock; |
| } |
| |
| @Override |
| protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) { |
| return kafkaMirroringSinkMock; |
| } |
| }; |
| } |
| |
| private static KafkaCrossDcConf testCrossDCConf() { |
| Map config = new HashMap<>(); |
| config.put(KafkaCrossDcConf.TOPIC_NAME, "topic1"); |
| config.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092"); |
| return new KafkaCrossDcConf(config); |
| } |
| |
| @After |
| public void tearDown() { |
| kafkaCrossDcConsumer.shutdown(); |
| } |
| |
| private ConsumerRecord<String, MirroredSolrRequest> createSampleConsumerRecord() { |
| return new ConsumerRecord<>("sample-topic", 0, 0, "key", createSampleMirroredSolrRequest()); |
| } |
| |
| private ConsumerRecords<String, MirroredSolrRequest> createSampleConsumerRecords() { |
| TopicPartition topicPartition = new TopicPartition("sample-topic", 0); |
| List<ConsumerRecord<String, MirroredSolrRequest>> recordsList = new ArrayList<>(); |
| recordsList.add( |
| new ConsumerRecord<>( |
| "sample-topic", 0, 0, "key", createSampleMirroredSolrRequest())); |
| return new ConsumerRecords<>(Collections.singletonMap(topicPartition, recordsList)); |
| } |
| |
| private MirroredSolrRequest createSampleMirroredSolrRequest() { |
| // Create a sample MirroredSolrRequest for testing |
| SolrInputDocument solrInputDocument = new SolrInputDocument(); |
| solrInputDocument.addField("id", "1"); |
| solrInputDocument.addField("title", "Sample title"); |
| solrInputDocument.addField("content", "Sample content"); |
| UpdateRequest updateRequest = new UpdateRequest(); |
| updateRequest.add(solrInputDocument); |
| return new MirroredSolrRequest(updateRequest); |
| } |
| |
| /** |
| * Should create a KafkaCrossDcConsumer with the given configuration and startLatch |
| */ |
| @Test |
| public void kafkaCrossDcConsumerCreationWithConfigurationAndStartLatch() { |
| KafkaCrossDcConf conf = testCrossDCConf(); |
| CountDownLatch startLatch = new CountDownLatch(1); |
| |
| KafkaCrossDcConsumer kafkaCrossDcConsumer = new KafkaCrossDcConsumer(conf, startLatch); |
| |
| assertNotNull(kafkaCrossDcConsumer); |
| assertEquals(1, startLatch.getCount()); |
| } |
| |
| @Test |
| public void testRunAndShutdown() throws Exception { |
| // Define the expected behavior of the mocks and set up the test scenario |
| |
| // Use a CountDownLatch to wait for the KafkaConsumer.subscribe method to be called |
| CountDownLatch subscribeLatch = new CountDownLatch(1); |
| doAnswer(invocation -> { |
| subscribeLatch.countDown(); |
| return null; |
| }).when(kafkaConsumerMock).subscribe(anyList()); |
| |
| when(kafkaConsumerMock.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); |
| |
| ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor(); |
| |
| |
| // Run the test |
| consumerThreadExecutor.submit(kafkaCrossDcConsumer); |
| |
| // Wait for the KafkaConsumer.subscribe method to be called |
| assertTrue(subscribeLatch.await(10, TimeUnit.SECONDS)); |
| |
| // Run the shutdown method |
| kafkaCrossDcConsumer.shutdown(); |
| |
| // Verify that the consumer was subscribed with the correct topic names |
| verify(kafkaConsumerMock).subscribe(anyList()); |
| |
| // Verify that the appropriate methods were called on the mocks |
| verify(kafkaConsumerMock).wakeup(); |
| verify(solrClientMock).close(); |
| |
| consumerThreadExecutor.shutdown(); |
| consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS); |
| } |
| |
| @Test |
| public void testHandleFailedResubmit() throws Exception { |
| // Set up the KafkaCrossDcConsumer |
| KafkaCrossDcConf testConf = testCrossDCConf(); |
| KafkaCrossDcConsumer consumer = spy(new KafkaCrossDcConsumer(testConf, new CountDownLatch(0))); |
| doNothing().when(consumer).sendBatch(any(UpdateRequest.class), any(ConsumerRecord.class), any(PartitionManager.WorkUnit.class)); |
| |
| // Set up the SolrMessageProcessor mock |
| SolrMessageProcessor mockMessageProcessor = mock(SolrMessageProcessor.class); |
| IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null); |
| when(mockMessageProcessor.handleItem(any(MirroredSolrRequest.class))).thenReturn(failedResubmitResult); |
| |
| // Mock the KafkaMirroringSink |
| KafkaMirroringSink mockKafkaMirroringSink = mock(KafkaMirroringSink.class); |
| doNothing().when(mockKafkaMirroringSink).submit(any(MirroredSolrRequest.class)); |
| consumer.kafkaMirroringSink = mockKafkaMirroringSink; |
| |
| // Call the method to test |
| ConsumerRecord<String, MirroredSolrRequest> record = createSampleConsumerRecord(); |
| consumer.processResult(record, failedResubmitResult); |
| |
| // Verify that the KafkaMirroringSink.submit() method was called |
| verify(consumer.kafkaMirroringSink, times(1)).submit(record.value()); |
| } |
| |
| |
| @Test |
| public void testCreateKafkaCrossDcConsumer() { |
| KafkaCrossDcConsumer consumer = new KafkaCrossDcConsumer(conf, new CountDownLatch(1)); |
| assertNotNull(consumer); |
| } |
| |
| @Test |
| public void testHandleValidMirroredSolrRequest() { |
| KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class); |
| KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) { |
| @Override |
| public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) { |
| return mockConsumer; |
| } |
| |
| @Override |
| public SolrMessageProcessor createSolrMessageProcessor() { |
| return messageProcessorMock; |
| } |
| |
| @Override |
| protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) { |
| return kafkaMirroringSinkMock; |
| } |
| }); |
| |
| SolrInputDocument doc = new SolrInputDocument(); |
| doc.addField("id", "1"); |
| UpdateRequest validRequest = new UpdateRequest(); |
| validRequest.add(doc); |
| validRequest.setParams(new ModifiableSolrParams().add("commit", "true")); |
| // Create a valid MirroredSolrRequest |
| ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(validRequest)); |
| ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record))); |
| |
| when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException()); |
| |
| spyConsumer.run(); |
| |
| // Verify that the valid MirroredSolrRequest was processed. |
| verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> { |
| // Check if the UpdateRequest has the same content as the original validRequest |
| return updateRequest.getDocuments().equals(validRequest.getDocuments()) && |
| updateRequest.getParams().equals(validRequest.getParams()); |
| }), eq(record), any()); |
| } |
| |
| @Test |
| public void testHandleInvalidMirroredSolrRequest() { |
| KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class); |
| SolrMessageProcessor mockSolrMessageProcessor = mock(SolrMessageProcessor.class); |
| KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) { |
| @Override |
| public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) { |
| return mockConsumer; |
| } |
| |
| @Override |
| public SolrMessageProcessor createSolrMessageProcessor() { |
| return mockSolrMessageProcessor; |
| } |
| |
| @Override |
| protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) { |
| return kafkaMirroringSinkMock; |
| } |
| }); |
| doReturn(mockConsumer).when(spyConsumer).createKafkaConsumer(any()); |
| |
| UpdateRequest invalidRequest = new UpdateRequest(); |
| // no updates on request |
| invalidRequest.setParams(new ModifiableSolrParams().add("invalid_param", "invalid_value")); |
| |
| ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(invalidRequest)); |
| ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record))); |
| |
| when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException()); |
| |
| spyConsumer.run(); |
| |
| // Verify that the valid MirroredSolrRequest was processed. |
| verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> { |
| // Check if the UpdateRequest has the same content as the original invalidRequest |
| return updateRequest.getDocuments() == null && |
| updateRequest.getParams().equals(invalidRequest.getParams()); |
| }), eq(record), any()); |
| } |
| |
| @Test |
| public void testHandleWakeupException() { |
| KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class); |
| KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) { |
| @Override |
| public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) { |
| return mockConsumer; |
| } |
| |
| @Override |
| public SolrMessageProcessor createSolrMessageProcessor() { |
| return messageProcessorMock; |
| } |
| |
| @Override |
| protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) { |
| return kafkaMirroringSinkMock; |
| } |
| }); |
| |
| when(mockConsumer.poll(any())).thenThrow(new WakeupException()); |
| |
| // Run the consumer in a separate thread to avoid blocking the test |
| ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| executorService.submit(spyConsumer); |
| |
| // Wait for a short period to allow the consumer to start and then trigger the shutdown |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| spyConsumer.shutdown(); |
| |
| // Verify that the WakeupException was caught and handled |
| verify(mockConsumer, atLeastOnce()).poll(any()); |
| verify(mockConsumer, times(1)).wakeup(); |
| |
| // Shutdown the executor service |
| executorService.shutdown(); |
| try { |
| executorService.awaitTermination(10, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| @Test |
| public void testShutdown() { |
| KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class); |
| KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) { |
| @Override |
| public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) { |
| return mockConsumer; |
| } |
| |
| @Override |
| public SolrMessageProcessor createSolrMessageProcessor() { |
| return messageProcessorMock; |
| } |
| |
| @Override |
| protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) { |
| return kafkaMirroringSinkMock; |
| } |
| }); |
| |
| |
| spyConsumer.shutdown(); |
| |
| verify(mockConsumer, times(1)).wakeup(); |
| } |
| } |