blob: 7e75357e5b932c4520ca6054689709637aaa3a55 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
public class StandardQueryResult implements QueryResult {
public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES);
private final Query query;
private final long creationNanos;
private final int numSteps;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
// guarded by writeLock
private final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>();
private long totalHitCount;
private int numCompletedSteps = 0;
private Date expirationDate;
private String error;
private long queryTime;
private volatile boolean canceled = false;
public StandardQueryResult(final Query query, final int numSteps) {
this.query = query;
this.numSteps = numSteps;
this.creationNanos = System.nanoTime();
updateExpiration();
}
@Override
public List<StoredProvenanceEvent> getMatchingEvents() {
readLock.lock();
try {
if (matchingRecords.size() <= query.getMaxResults()) {
return new ArrayList<>(matchingRecords);
}
final List<StoredProvenanceEvent> copy = new ArrayList<>(query.getMaxResults());
for (int i = 0; i < query.getMaxResults(); i++) {
copy.add(matchingRecords.get(i));
}
return copy;
} finally {
readLock.unlock();
}
}
@Override
public long getTotalHitCount() {
readLock.lock();
try {
return totalHitCount;
} finally {
readLock.unlock();
}
}
@Override
public long getQueryTime() {
return queryTime;
}
@Override
public Date getExpiration() {
return expirationDate;
}
@Override
public String getError() {
return error;
}
@Override
public int getPercentComplete() {
readLock.lock();
try {
return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F);
} finally {
readLock.unlock();
}
}
@Override
public boolean isFinished() {
readLock.lock();
try {
return numCompletedSteps >= numSteps || canceled;
} finally {
readLock.unlock();
}
}
void cancel() {
this.canceled = true;
}
public void setError(final String error) {
writeLock.lock();
try {
this.error = error;
numCompletedSteps++;
updateExpiration();
if (numCompletedSteps >= numSteps) {
final long searchNanos = System.nanoTime() - creationNanos;
queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS);
}
} finally {
writeLock.unlock();
}
}
public void update(final Collection<StoredProvenanceEvent> matchingRecords, final long totalHits) {
writeLock.lock();
try {
this.matchingRecords.addAll(matchingRecords);
this.totalHitCount += totalHits;
numCompletedSteps++;
updateExpiration();
if (numCompletedSteps >= numSteps) {
final long searchNanos = System.nanoTime() - creationNanos;
queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS);
}
} finally {
writeLock.unlock();
}
}
/**
* Must be called with write lock!
*/
private void updateExpiration() {
expirationDate = new Date(System.currentTimeMillis() + TTL);
}
}