| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.fineract.infrastructure.event.external.jobs; |
| |
| import static java.util.stream.Collectors.groupingBy; |
| import static java.util.stream.Collectors.toMap; |
| import static org.apache.fineract.infrastructure.core.diagnostics.performance.MeasuringUtil.measure; |
| |
| import com.google.common.collect.Lists; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.time.OffsetDateTime; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import lombok.RequiredArgsConstructor; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.fineract.avro.MessageV1; |
| import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService; |
| import org.apache.fineract.infrastructure.core.config.FineractProperties; |
| import org.apache.fineract.infrastructure.core.domain.FineractContext; |
| import org.apache.fineract.infrastructure.core.service.DateUtils; |
| import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; |
| import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer; |
| import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository; |
| import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus; |
| import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView; |
| import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory; |
| import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter; |
| import org.springframework.batch.core.StepContribution; |
| import org.springframework.batch.core.scope.context.ChunkContext; |
| import org.springframework.batch.core.step.tasklet.Tasklet; |
| import org.springframework.batch.repeat.RepeatStatus; |
| import org.springframework.data.domain.PageRequest; |
| import org.springframework.data.domain.Pageable; |
| import org.springframework.stereotype.Component; |
| |
| @Slf4j |
| @RequiredArgsConstructor |
| @Component |
| public class SendAsynchronousEventsTasklet implements Tasklet { |
| |
| private final FineractProperties fineractProperties; |
| private final ExternalEventRepository repository; |
| private final ExternalEventProducer eventProducer; |
| private final MessageFactory messageFactory; |
| private final ByteBufferConverter byteBufferConverter; |
| private final ConfigurationDomainService configurationDomainService; |
| |
| @Override |
| public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { |
| try { |
| if (isDownstreamChannelEnabled()) { |
| List<ExternalEventView> events = getQueuedEventsBatch(); |
| log.debug("Queued events size: {}", events.size()); |
| sendEvents(events); |
| } |
| } catch (Exception e) { |
| log.error("Error occurred while processing events: ", e); |
| } |
| return RepeatStatus.FINISHED; |
| } |
| |
| private boolean isDownstreamChannelEnabled() { |
| return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled() |
| || fineractProperties.getEvents().getExternal().getProducer().getKafka().isEnabled(); |
| } |
| |
| private List<ExternalEventView> getQueuedEventsBatch() { |
| int readBatchSize = getBatchSize(); |
| Pageable batchSize = PageRequest.ofSize(readBatchSize); |
| return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize), |
| (events, timeTaken) -> log.debug("Loaded {} events in {}ms", events.size(), timeTaken.toMillis())); |
| } |
| |
| private void sendEvents(List<ExternalEventView> queuedEvents) { |
| Map<Long, List<byte[]>> partitions = generatePartitions(queuedEvents); |
| List<Long> eventIds = queuedEvents.stream().map(ExternalEventView::getId).toList(); |
| sendEventsToProducer(partitions); |
| markEventsAsSent(eventIds); |
| } |
| |
| private void sendEventsToProducer(Map<Long, List<byte[]>> partitions) { |
| eventProducer.sendEvents(partitions); |
| } |
| |
| private void markEventsAsSent(List<Long> eventIds) { |
| OffsetDateTime sentAt = DateUtils.getAuditOffsetDateTime(); |
| |
| // Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters |
| final int partitionSize = fineractProperties.getEvents().getExternal().getPartitionSize(); |
| List<List<Long>> partitions = Lists.partition(eventIds, partitionSize); |
| FineractContext context = ThreadLocalContextUtil.getContext(); |
| partitions.stream() // |
| .parallel() // |
| .forEach(partitionedEventIds -> { |
| measure(() -> { |
| ThreadLocalContextUtil.init(context); |
| repository.markEventsSent(partitionedEventIds, sentAt); |
| ThreadLocalContextUtil.reset(); |
| }, timeTaken -> { |
| log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size()); |
| }); |
| }); |
| } |
| |
| private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> queuedEvents) { |
| Map<Long, List<ExternalEventView>> initialPartitions = queuedEvents.stream().collect(groupingBy(externalEvent -> { |
| Long aggregateRootId = externalEvent.getAggregateRootId(); |
| if (aggregateRootId == null) { |
| aggregateRootId = -1L; |
| } |
| return aggregateRootId; |
| })); |
| Map<Long, List<byte[]>> partitions = measure( |
| () -> initialPartitions.entrySet().stream().collect(toMap(Map.Entry::getKey, e -> createMessages(e.getValue()))), |
| timeTaken -> { |
| log.debug("Took {}ms to create message partitions", timeTaken.toMillis()); |
| }); |
| return partitions; |
| } |
| |
| private List<byte[]> createMessages(List<ExternalEventView> events) { |
| try { |
| List<byte[]> messages = new ArrayList<>(); |
| for (ExternalEventView event : events) { |
| MessageV1 message = messageFactory.createMessage(event); |
| ByteBuffer toByteBuffer = message.toByteBuffer(); |
| byte[] convert = byteBufferConverter.convert(toByteBuffer); |
| messages.add(convert); |
| log.trace("Created message to send with id: [{}], type: [{}], idempotency key: [{}]", message.getId(), message.getType(), |
| message.getIdempotencyKey()); |
| } |
| return messages; |
| } catch (IOException e) { |
| throw new RuntimeException("Error while serializing the message", e); |
| } |
| } |
| |
| private int getBatchSize() { |
| Long externalEventBatchSize = configurationDomainService.retrieveExternalEventBatchSize(); |
| return externalEventBatchSize.intValue(); |
| } |
| |
| } |