blob: 4bec4c046170d6bf329cae02bd44fc992ee194a3 [file] [log] [blame]
package org.apache.nifi.pql.results;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.nifi.pql.evaluation.Accumulator;
import org.apache.nifi.pql.evaluation.RecordEvaluator;
import org.apache.nifi.pql.evaluation.order.RowSorter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.StoredProvenanceEvent;
import org.apache.nifi.provenance.query.ProvenanceResultSet;
public class StandardOrderedResultSet implements ProvenanceResultSet {
private final List<String> labels;
private final List<Class<?>> returnTypes;
private final Iterator<? extends StoredProvenanceEvent> recordItr;
private final List<Accumulator<?>> selectAccumulators;
private final RecordEvaluator<Boolean> sourceEvaluator;
private final RecordEvaluator<Boolean> conditionEvaluator;
private final RowSorter sorter;
private final Long limit;
private Iterator<ResultRow> resultRowItr;
public StandardOrderedResultSet(final Iterator<? extends StoredProvenanceEvent> recordItr,
final List<Accumulator<?>> selectAccumulators,
final RecordEvaluator<Boolean> sourceEvaluator,
final RecordEvaluator<Boolean> conditionEvaluator,
final List<String> labels,
final List<Class<?>> returnTypes,
final RowSorter sorter,
final Long limit)
{
this.labels = labels;
this.returnTypes = returnTypes;
this.recordItr = recordItr;
this.selectAccumulators = selectAccumulators;
this.sourceEvaluator = sourceEvaluator;
this.conditionEvaluator = conditionEvaluator;
this.sorter = sorter;
this.limit = limit;
}
@Override
public List<String> getLabels() {
return labels;
}
@Override
public List<Class<?>> getReturnType() {
return returnTypes;
}
private void createResultRowItr() {
final List<ResultRow> rows = new ArrayList<>();
int idx = 0;
while (recordItr.hasNext()) {
final ProvenanceEventRecord record = recordItr.next();
if ( sourceEvaluator != null && !sourceEvaluator.evaluate(record) ) {
continue;
}
final boolean meetsConditions = conditionEvaluator == null ? true : conditionEvaluator.evaluate(record);
if ( meetsConditions ) {
final List<Object> values = new ArrayList<>(selectAccumulators.size());
for ( final Accumulator<?> accumulator : selectAccumulators ) {
final Object value = accumulator.accumulate(record, null);
accumulator.reset();
values.add(value);
}
rows.add(new ResultRow(values));
sorter.add(record, null, idx++);
}
}
final List<ResultRow> sortedRows = new ArrayList<>();
for ( final Integer unsortedIndex : sorter.sort() ) {
final ResultRow row = rows.get(unsortedIndex.intValue());
sortedRows.add(row);
if ( limit != null && sortedRows.size() >= limit.intValue() ) {
break;
}
}
resultRowItr = sortedRows.iterator();
}
@Override
public boolean hasNext() {
if ( resultRowItr == null ) {
createResultRowItr();
}
return resultRowItr.hasNext();
}
@Override
public List<?> next() {
if ( resultRowItr == null ) {
createResultRowItr();
}
return resultRowItr.next().getValues();
}
}