blob: b35c1d2dd701cc6a721bbc9311bb6375e849889d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.infra.job.archive;
import static org.apache.ambari.infra.job.JobsPropertyMap.PARAMETERS_CONTEXT_KEY;
import static org.apache.ambari.infra.job.archive.SolrQueryBuilder.computeEnd;
import static org.apache.commons.lang.StringUtils.isBlank;
import java.io.File;
import javax.inject.Inject;
import org.apache.ambari.infra.conf.InfraManagerDataConfig;
import org.apache.ambari.infra.conf.security.S3Secrets;
import org.apache.ambari.infra.job.AbstractJobsConfiguration;
import org.apache.ambari.infra.job.JobContextRepository;
import org.apache.ambari.infra.job.JobScheduler;
import org.apache.ambari.infra.job.ObjectSource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<ArchivingProperties, ArchivingProperties> {
private static final Logger logger = LogManager.getLogger(DocumentArchivingConfiguration.class);
private static final DocumentWiper NOT_DELETE = (firstDocument, lastDocument) -> { };
private final StepBuilderFactory steps;
private final Step exportStep;
@Inject
public DocumentArchivingConfiguration(
DocumentArchivingPropertyMap jobsPropertyMap,
JobScheduler scheduler,
StepBuilderFactory steps,
JobBuilderFactory jobs,
@Qualifier("exportStep") Step exportStep,
JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) {
super(jobsPropertyMap.getSolrDataArchiving(), scheduler, jobs, jobRegistryBeanPostProcessor);
this.exportStep = exportStep;
this.steps = steps;
}
@Override
protected Job buildJob(JobBuilder jobBuilder) {
return jobBuilder.start(exportStep).build();
}
@Bean
@JobScope
public Step exportStep(DocumentExporter documentExporter) {
return steps.get("export")
.tasklet(documentExporter)
.build();
}
@Bean
@StepScope
public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
@Value("#{stepExecution.jobExecution.jobId}") String jobId,
@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingProperties parameters,
InfraManagerDataConfig infraManagerDataConfig,
@Value("#{jobParameters[end]}") String intervalEnd,
DocumentWiper documentWiper,
JobContextRepository jobContextRepository,
S3Secrets s3Secrets) {
File baseDir = new File(infraManagerDataConfig.getDataFolder(), "exporting");
CompositeFileAction fileAction = new CompositeFileAction(new BZip2Compressor());
switch (parameters.getDestination()) {
case S3:
fileAction.add(new S3Uploader(
parameters.s3Properties().orElseThrow(() -> new IllegalStateException("S3 properties are not provided!")),
s3Secrets));
break;
case HDFS:
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
fileAction.add(new HdfsUploader(conf,
parameters.hdfsProperties().orElseThrow(() -> new IllegalStateException("HDFS properties are not provided!"))));
break;
case LOCAL:
baseDir = new File(parameters.getLocalDestinationDirectory());
break;
}
FileNameSuffixFormatter fileNameSuffixFormatter = FileNameSuffixFormatter.from(parameters);
LocalItemWriterListener itemWriterListener = new LocalItemWriterListener(fileAction, documentWiper);
File destinationDirectory = new File(
baseDir,
String.format("%s_%s_%s",
parameters.getSolr().getCollection(),
jobId,
isBlank(intervalEnd) ? "" : fileNameSuffixFormatter.format(intervalEnd)));
logger.info("Destination directory path={}", destinationDirectory);
if (!destinationDirectory.exists()) {
if (!destinationDirectory.mkdirs()) {
logger.warn("Unable to create directory {}", destinationDirectory);
}
}
return new DocumentExporter(
documentItemReader,
firstDocument -> new LocalDocumentItemWriter(
outFile(parameters.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener),
parameters.getWriteBlockSize(), jobContextRepository);
}
@Bean
@StepScope
public DocumentWiper documentWiper(@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingProperties parameters,
SolrDAO solrDAO) {
if (isBlank(parameters.getSolr().getDeleteQueryText()))
return NOT_DELETE;
return solrDAO;
}
@Bean
@StepScope
public SolrDAO solrDAO(@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingProperties parameters) {
return new SolrDAO(parameters.getSolr());
}
private File outFile(String collection, File directoryPath, String suffix) {
File file = new File(directoryPath, String.format("%s_-_%s.json", collection, suffix));
logger.info("Exporting to temp file {}", file.getAbsolutePath());
return file;
}
@Bean
@StepScope
public DocumentItemReader reader(ObjectSource<Document> documentSource,
@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingProperties properties) {
return new DocumentItemReader(documentSource, properties.getReadBlockSize());
}
@Bean
@StepScope
public ObjectSource<Document> documentSource(@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingProperties parameters,
SolrDAO solrDAO) {
return new SolrDocumentSource(solrDAO, parameters.getStart(), computeEnd(parameters.getEnd(), parameters.getTtl()));
}
}