blob: 91746f13ba728d1636e1b6dd5eb65d2b5a180607 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.infrastructure.event.external.jobs;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toMap;
import static org.apache.fineract.infrastructure.core.diagnostics.performance.MeasuringUtil.measure;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.domain.FineractContext;
import org.apache.fineract.infrastructure.core.service.DateUtils;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Component;
@Slf4j
@RequiredArgsConstructor
@Component
public class SendAsynchronousEventsTasklet implements Tasklet {
private final FineractProperties fineractProperties;
private final ExternalEventRepository repository;
private final ExternalEventProducer eventProducer;
private final MessageFactory messageFactory;
private final ByteBufferConverter byteBufferConverter;
private final ConfigurationDomainService configurationDomainService;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
try {
if (isDownstreamChannelEnabled()) {
List<ExternalEventView> events = getQueuedEventsBatch();
log.debug("Queued events size: {}", events.size());
sendEvents(events);
}
} catch (Exception e) {
log.error("Error occurred while processing events: ", e);
}
return RepeatStatus.FINISHED;
}
private boolean isDownstreamChannelEnabled() {
return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled()
|| fineractProperties.getEvents().getExternal().getProducer().getKafka().isEnabled();
}
private List<ExternalEventView> getQueuedEventsBatch() {
int readBatchSize = getBatchSize();
Pageable batchSize = PageRequest.ofSize(readBatchSize);
return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize),
(events, timeTaken) -> log.debug("Loaded {} events in {}ms", events.size(), timeTaken.toMillis()));
}
private void sendEvents(List<ExternalEventView> queuedEvents) {
Map<Long, List<byte[]>> partitions = generatePartitions(queuedEvents);
List<Long> eventIds = queuedEvents.stream().map(ExternalEventView::getId).toList();
sendEventsToProducer(partitions);
markEventsAsSent(eventIds);
}
private void sendEventsToProducer(Map<Long, List<byte[]>> partitions) {
eventProducer.sendEvents(partitions);
}
private void markEventsAsSent(List<Long> eventIds) {
OffsetDateTime sentAt = DateUtils.getAuditOffsetDateTime();
// Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
final int partitionSize = fineractProperties.getEvents().getExternal().getPartitionSize();
List<List<Long>> partitions = Lists.partition(eventIds, partitionSize);
FineractContext context = ThreadLocalContextUtil.getContext();
partitions.stream() //
.parallel() //
.forEach(partitionedEventIds -> {
measure(() -> {
ThreadLocalContextUtil.init(context);
repository.markEventsSent(partitionedEventIds, sentAt);
ThreadLocalContextUtil.reset();
}, timeTaken -> {
log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
});
});
}
private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> queuedEvents) {
Map<Long, List<ExternalEventView>> initialPartitions = queuedEvents.stream().collect(groupingBy(externalEvent -> {
Long aggregateRootId = externalEvent.getAggregateRootId();
if (aggregateRootId == null) {
aggregateRootId = -1L;
}
return aggregateRootId;
}));
Map<Long, List<byte[]>> partitions = measure(
() -> initialPartitions.entrySet().stream().collect(toMap(Map.Entry::getKey, e -> createMessages(e.getValue()))),
timeTaken -> {
log.debug("Took {}ms to create message partitions", timeTaken.toMillis());
});
return partitions;
}
private List<byte[]> createMessages(List<ExternalEventView> events) {
try {
List<byte[]> messages = new ArrayList<>();
for (ExternalEventView event : events) {
MessageV1 message = messageFactory.createMessage(event);
ByteBuffer toByteBuffer = message.toByteBuffer();
byte[] convert = byteBufferConverter.convert(toByteBuffer);
messages.add(convert);
log.trace("Created message to send with id: [{}], type: [{}], idempotency key: [{}]", message.getId(), message.getType(),
message.getIdempotencyKey());
}
return messages;
} catch (IOException e) {
throw new RuntimeException("Error while serializing the message", e);
}
}
private int getBatchSize() {
Long externalEventBatchSize = configurationDomainService.retrieveExternalEventBatchSize();
return externalEventBatchSize.intValue();
}
}