FINERACT-1971: Fix marking external events sent in parallel
diff --git a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
index 7277cfa..91746f1 100644
--- a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
+++ b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -34,7 +34,9 @@
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.domain.FineractContext;
import org.apache.fineract.infrastructure.core.service.DateUtils;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
@@ -104,11 +106,14 @@
// Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
final int partitionSize = fineractProperties.getEvents().getExternal().getPartitionSize();
List<List<Long>> partitions = Lists.partition(eventIds, partitionSize);
+ FineractContext context = ThreadLocalContextUtil.getContext();
partitions.stream() //
.parallel() //
.forEach(partitionedEventIds -> {
measure(() -> {
+ ThreadLocalContextUtil.init(context);
repository.markEventsSent(partitionedEventIds, sentAt);
+ ThreadLocalContextUtil.reset();
}, timeTaken -> {
log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
});
diff --git a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
index 9f7ccbd..49f772b 100644
--- a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++ b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
@@ -18,6 +18,7 @@
*/
package org.apache.fineract.infrastructure.event.external.repository;
+import jakarta.transaction.Transactional;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.List;
@@ -40,6 +41,7 @@
void deleteOlderEventsWithSentStatus(@Param("status") ExternalEventStatus status,
@Param("dateForPurgeCriteria") LocalDate dateForPurgeCriteria);
+ @Transactional
@Modifying
@Query("UPDATE ExternalEvent e SET e.status = org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus.SENT, e.sentAt = :sentAt WHERE e.id IN :ids")
void markEventsSent(@Param("ids") List<Long> ids, @Param("sentAt") OffsetDateTime sentAt);