blob: 4b405bf21291f1ff9ad1460fc8f9c0b4f4fb990f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.cob.loan;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.cob.COBBusinessStepService;
import org.apache.fineract.cob.data.BusinessStepNameAndOrder;
import org.apache.fineract.cob.data.LoanCOBParameter;
import org.apache.fineract.cob.data.LoanCOBPartition;
import org.apache.fineract.infrastructure.jobs.service.JobName;
import org.apache.fineract.infrastructure.springbatch.PropertyService;
import org.jetbrains.annotations.NotNull;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.StopWatch;
@Slf4j
@RequiredArgsConstructor
public class LoanCOBPartitioner implements Partitioner {
public static final String PARTITION_PREFIX = "partition_";
private final PropertyService propertyService;
private final COBBusinessStepService cobBusinessStepService;
private final RetrieveLoanIdService retrieveLoanIdService;
private final JobOperator jobOperator;
private final JobExplorer jobExplorer;
private final Long numberOfDays;
@Value("#{jobExecutionContext['BusinessDate']}")
@Setter
private LocalDate businessDate;
@Value("#{jobExecutionContext['IS_CATCH_UP']}")
@Setter
private Boolean isCatchUp;
@NotNull
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int partitionSize = propertyService.getPartitionSize(LoanCOBConstant.JOB_NAME);
Set<BusinessStepNameAndOrder> cobBusinessSteps = cobBusinessStepService.getCOBBusinessSteps(LoanCOBBusinessStep.class,
LoanCOBConstant.LOAN_COB_JOB_NAME);
return getPartitions(partitionSize, cobBusinessSteps);
}
private Map<String, ExecutionContext> getPartitions(int partitionSize, Set<BusinessStepNameAndOrder> cobBusinessSteps) {
if (cobBusinessSteps.isEmpty()) {
stopJobExecution();
return Map.of();
}
StopWatch sw = new StopWatch();
sw.start();
List<LoanCOBPartition> loanCOBPartitions = new ArrayList<>(
retrieveLoanIdService.retrieveLoanCOBPartitions(numberOfDays, businessDate, isCatchUp != null && isCatchUp, partitionSize));
sw.stop();
// if there is no loan to be closed, we still would like to create at least one partition
if (loanCOBPartitions.size() == 0) {
loanCOBPartitions.add(new LoanCOBPartition(0L, 0L, 1L, 0L));
}
log.info(
"LoanCOBPartitioner found {} loans to be processed as part of COB. {} partitions were created using partition size {}. RetrieveLoanCOBPartitions was executed in {} ms.",
getLoanCount(loanCOBPartitions), loanCOBPartitions.size(), partitionSize, sw.getTotalTimeMillis());
return loanCOBPartitions.stream()
.collect(Collectors.toMap(l -> PARTITION_PREFIX + l.getPageNo(), l -> createNewPartition(cobBusinessSteps, l)));
}
private long getLoanCount(List<LoanCOBPartition> loanCOBPartitions) {
return loanCOBPartitions.stream().map(LoanCOBPartition::getCount).reduce(0L, Long::sum);
}
private ExecutionContext createNewPartition(Set<BusinessStepNameAndOrder> cobBusinessSteps, LoanCOBPartition loanCOBPartition) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put(LoanCOBConstant.BUSINESS_STEPS, cobBusinessSteps);
executionContext.put(LoanCOBConstant.LOAN_COB_PARAMETER,
new LoanCOBParameter(loanCOBPartition.getMinId(), loanCOBPartition.getMaxId()));
executionContext.put("partition", PARTITION_PREFIX + loanCOBPartition.getPageNo());
return executionContext;
}
private void stopJobExecution() {
Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(JobName.LOAN_COB.name());
for (JobExecution jobExecution : runningJobExecutions) {
try {
jobOperator.stop(jobExecution.getId());
} catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) {
log.error("There is no running execution for the given execution ID. Execution ID: {}", jobExecution.getId());
throw new RuntimeException(e);
}
}
}
}