blob: 0caaf3c4e9ce01695a8dec76c39650ce273c9fd5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.infrastructure.event.external.jobs;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.domain.ActionContext;
import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.data.domain.Pageable;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class SendAsynchronousEventsTaskletTest {
@Mock
private FineractProperties fineractProperties;
@Mock
private ExternalEventRepository repository;
@Mock
private ExternalEventProducer eventProducer;
@Mock
private MessageFactory messageFactory;
@Mock
private StepContribution stepContribution;
@Mock
private ChunkContext chunkContext;
@Mock
private ByteBufferConverter byteBufferConverter;
@Mock
private ConfigurationDomainService configurationDomainService;
private SendAsynchronousEventsTasklet underTest;
private RepeatStatus resultStatus;
private Random rnd = new Random();
@BeforeEach
public void setUp() {
ThreadLocalContextUtil.setTenant(new FineractPlatformTenant(1L, "default", "Default", "Asia/Kolkata", null));
ThreadLocalContextUtil.setActionContext(ActionContext.DEFAULT);
ThreadLocalContextUtil
.setBusinessDates(new HashMap<>(Map.of(BusinessDateType.BUSINESS_DATE, LocalDate.now(ZoneId.systemDefault()))));
configureExternalEventsProducerReadBatchSizeProperty();
underTest = new SendAsynchronousEventsTasklet(fineractProperties, repository, eventProducer, messageFactory, byteBufferConverter,
configurationDomainService);
}
@AfterEach
public void tearDown() {
ThreadLocalContextUtil.reset();
}
private void configureExternalEventsProducerReadBatchSizeProperty() {
FineractProperties.FineractEventsProperties eventsProperties = new FineractProperties.FineractEventsProperties();
FineractProperties.FineractExternalEventsProperties externalProperties = new FineractProperties.FineractExternalEventsProperties();
FineractProperties.FineractExternalEventsProducerProperties externalEventsProducerProperties = new FineractProperties.FineractExternalEventsProducerProperties();
FineractProperties.FineractExternalEventsProducerJmsProperties externalEventsProducerJMSProperties = new FineractProperties.FineractExternalEventsProducerJmsProperties();
externalEventsProducerJMSProperties.setEnabled(true);
externalProperties.setEnabled(true);
externalProperties.setPartitionSize(5000);
externalEventsProducerProperties.setJms(externalEventsProducerJMSProperties);
externalProperties.setProducer(externalEventsProducerProperties);
eventsProperties.setExternal(externalProperties);
when(fineractProperties.getEvents()).thenReturn(eventsProperties);
when(configurationDomainService.retrieveExternalEventBatchSize()).thenReturn(10L);
}
@Test
public void givenBatchSize2WhenTaskExecutionThenSend2Events() throws Exception {
// given
List<ExternalEventView> events = Arrays.asList(
createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L),
createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L));
// Dummy Message
MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
"anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
// when
resultStatus = underTest.execute(stepContribution, chunkContext);
// then
verify(eventProducer).sendEvents(Mockito.any());
verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
assertEquals(RepeatStatus.FINISHED, resultStatus);
}
@Test
public void givenBatchSize2WhenEventSendFailsThenExecutionStops() throws Exception {
// given
List<ExternalEventView> events = Arrays.asList(
createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L),
createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L));
MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
"anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
doThrow(new AcknowledgementTimeoutException("Event Send Exception", new RuntimeException())).when(eventProducer)
.sendEvents(Mockito.any());
// when
resultStatus = underTest.execute(stepContribution, chunkContext);
// then
verify(repository, times(0)).markEventsSent(Mockito.any(), Mockito.any());
assertEquals(RepeatStatus.FINISHED, resultStatus);
}
@Test
public void givenOneEventWhenEventSentThenEventStatusUpdates() throws Exception {
// given
List<ExternalEventView> events = Arrays
.asList(createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L));
MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
"anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
// when
resultStatus = underTest.execute(stepContribution, chunkContext);
// then
verify(messageFactory).createMessage(Mockito.any());
verify(eventProducer).sendEvents(Mockito.any());
verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
assertEquals(RepeatStatus.FINISHED, resultStatus);
}
@Test
public void testExecuteShouldHandleNullAggregateId() throws Exception {
// given
List<ExternalEventView> events = Arrays
.asList(createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", null));
MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
"anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
byte[] byteMsg = new byte[0];
when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(byteMsg);
// when
resultStatus = underTest.execute(stepContribution, chunkContext);
// then
verify(messageFactory).createMessage(Mockito.any());
verify(eventProducer).sendEvents(Map.of(-1L, List.of(byteMsg)));
verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
assertEquals(RepeatStatus.FINISHED, resultStatus);
}
@Test
public void givenEventBatchSizeIsConfiguredAs10WhenTaskExecutionThenEventReadPageSizeIsCorrect() {
ArgumentCaptor<Pageable> externalEventPageSizeArgumentCaptor = ArgumentCaptor.forClass(Pageable.class);
List<ExternalEventView> events = new ArrayList<>();
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
// when
resultStatus = underTest.execute(stepContribution, chunkContext);
// then
verify(repository).findByStatusOrderById(Mockito.any(), externalEventPageSizeArgumentCaptor.capture());
assertThat(externalEventPageSizeArgumentCaptor.getValue().getPageSize()).isEqualTo(10);
}
private ExternalEventView createExternalEventView(String type, String category, String schema, byte[] data, String idempotencyKey,
Long aggregateRootId) {
ExternalEventView result = Mockito.mock(ExternalEventView.class);
Mockito.when(result.getId()).thenReturn(rnd.nextLong());
Mockito.when(result.getType()).thenReturn(type);
Mockito.when(result.getCategory()).thenReturn(category);
Mockito.when(result.getSchema()).thenReturn(schema);
Mockito.when(result.getData()).thenReturn(data);
Mockito.when(result.getIdempotencyKey()).thenReturn(idempotencyKey);
Mockito.when(result.getAggregateRootId()).thenReturn(aggregateRootId);
return result;
}
}