blob: 03f100b9e9ae75c56ef141e82d12f7e79be19945 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.pql.ProvenanceQuery;
import org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.query.ProvenanceQuerySubmission;
import org.apache.nifi.provenance.query.ProvenanceResultSet;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
public class GenerateProvenanceReport extends AbstractReportingTask {
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Provenance Query")
.description("The Provenance Query to run against the repository")
.required(true)
.expressionLanguageSupported(false)
.addValidator(new ProvenanceQueryLanguageValidator())
.build();
public static final PropertyDescriptor DESTINATION_FILE = new PropertyDescriptor.Builder()
.name("Destination File")
.description("The file to write the results to. If not specified, the results will be written to the log")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QUERY);
properties.add(DESTINATION_FILE);
return properties;
}
@Override
public void onTrigger(final ReportingContext context) {
try {
final long startNanos = System.nanoTime();
final ProvenanceQuerySubmission submission = context.getEventAccess().getProvenanceRepository().submitQuery(context.getProperty(QUERY).getValue());
final ProvenanceResultSet rs = submission.getResult().getResultSet();
final List<String> labels = rs.getLabels();
int length = 2;
for ( final String label : labels ) {
length += label.length() + 2;
}
final StringBuilder sb = new StringBuilder("\n");
for (int i=0; i < length; i++) {
sb.append("-");
}
sb.append("\n| ");
for ( final String label : labels ) {
sb.append(label).append(" |");
}
sb.append("\n");
for (int i=0; i < length; i++) {
sb.append("-");
}
sb.append("\n");
int rowCount = 0;
while (rs.hasNext()) {
final List<?> cols = rs.next();
for ( final Object col : cols ) {
sb.append("| ").append(col);
}
sb.append(" |\n");
rowCount++;
}
final String filename = context.getProperty(DESTINATION_FILE).getValue();
if ( filename == null ) {
getLogger().info(sb.toString());
} else {
final File file = new File(filename);
final File directory = file.getParentFile();
if ( !directory.exists() && !directory.mkdirs() ) {
throw new ProcessException("Cannot create directory " + directory + " to write to file");
}
try (final OutputStream fos = new FileOutputStream(file)) {
fos.write(sb.toString().getBytes(StandardCharsets.UTF_8));
}
}
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully generated report with {} rows in the result; report generation took {} millis", new Object[] {rowCount, millis});
} catch (final IOException ioe) {
throw new ProcessException(ioe);
}
}
private static class ProvenanceQueryLanguageValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
try {
ProvenanceQuery.compile(input, null, null);
} catch (final ProvenanceQueryLanguageParsingException e) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation(e.getMessage()).build();
}
return new ValidationResult.Builder().input(input).subject(subject).valid(true).build();
}
}
}