blob: de49ad2701d71a0d69e1b435b8744b6d6ff2fac9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.queue;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class AbstractFlowFileQueue implements FlowFileQueue {
private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class);
private final String identifier;
private final FlowFileRepository flowFileRepository;
private final ProvenanceEventRepository provRepository;
private final ResourceClaimManager resourceClaimManager;
private final ProcessScheduler scheduler;
private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 sec", 0L));
private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000));
private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
private String partitioningAttribute = null;
private final ReadWriteLock loadBalanceRWLock = new ReentrantReadWriteLock();
private final Lock loadBalanceReadLock = loadBalanceRWLock.readLock();
private final Lock loadBalanceWriteLock = loadBalanceRWLock.writeLock();
private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS;
public AbstractFlowFileQueue(final String identifier, final ProcessScheduler scheduler,
final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, final ResourceClaimManager resourceClaimManager) {
this.identifier = identifier;
this.scheduler = scheduler;
this.flowFileRepository = flowFileRepo;
this.provRepository = provRepo;
this.resourceClaimManager = resourceClaimManager;
}
@Override
public String getIdentifier() {
return identifier;
}
protected ProcessScheduler getScheduler() {
return scheduler;
}
@Override
public String getFlowFileExpiration() {
return expirationPeriod.get().getPeriod();
}
@Override
public int getFlowFileExpiration(final TimeUnit timeUnit) {
return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
}
@Override
public void setFlowFileExpiration(final String flowExpirationPeriod) {
final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
if (millis < 0) {
throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
}
expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
}
@Override
public void setBackPressureObjectThreshold(final long threshold) {
boolean updated = false;
while (!updated) {
MaxQueueSize maxSize = getMaxQueueSize();
final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
}
}
@Override
public long getBackPressureObjectThreshold() {
return getMaxQueueSize().getMaxCount();
}
@Override
public void setBackPressureDataSizeThreshold(final String maxDataSize) {
final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
boolean updated = false;
while (!updated) {
MaxQueueSize maxSize = getMaxQueueSize();
final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
}
}
@Override
public String getBackPressureDataSizeThreshold() {
return getMaxQueueSize().getMaxSize();
}
private MaxQueueSize getMaxQueueSize() {
return maxQueueSize.get();
}
@Override
public boolean isFull() {
return isFull(size());
}
protected boolean isFull(final QueueSize queueSize) {
final MaxQueueSize maxSize = getMaxQueueSize();
// Check if max size is set
if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
return false;
}
if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
return true;
}
if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
return true;
}
return false;
}
@Override
public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) {
// purge any old requests from the map just to keep it clean. But if there are very few requests, which is usually the case, then don't bother
if (listRequestMap.size() > 10) {
final List<String> toDrop = new ArrayList<>();
for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) {
final ListFlowFileRequest request = entry.getValue();
final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
toDrop.add(entry.getKey());
}
}
for (final String requestId : toDrop) {
listRequestMap.remove(requestId);
}
}
// numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue.
final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, maxResults, size());
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
int position = 0;
final List<FlowFileSummary> summaries = new ArrayList<>();
// Create an ArrayList that contains all of the contents of the active queue.
// We do this so that we don't have to hold the lock any longer than absolutely necessary.
// We cannot simply pull the first 'maxResults' records from the queue, however, because the
// Iterator provided by PriorityQueue does not return records in order. So we would have to either
// use a writeLock and 'pop' the first 'maxResults' records off the queue or use a read lock and
// do a shallow copy of the queue. The shallow copy is generally quicker because it doesn't have to do
// the sorting to put the records back. So even though this has an expensive of Java Heap to create the
// extra collection, we are making this trade-off to avoid locking the queue any longer than required.
final List<FlowFileRecord> allFlowFiles = getListableFlowFiles();
final QueuePrioritizer prioritizer = new QueuePrioritizer(getPriorities());
listRequest.setState(ListFlowFileState.CALCULATING_LIST);
// sort the FlowFileRecords so that we have the list in the same order as on the queue.
allFlowFiles.sort(prioritizer);
for (final FlowFileRecord flowFile : allFlowFiles) {
summaries.add(summarize(flowFile, ++position));
if (summaries.size() >= maxResults) {
break;
}
}
logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results out of {} FlowFiles", this, summaries.size(), allFlowFiles.size());
listRequest.setFlowFileSummaries(summaries);
listRequest.setState(ListFlowFileState.COMPLETE);
}
}, "List FlowFiles for Connection " + getIdentifier());
t.setDaemon(true);
t.start();
listRequestMap.put(requestIdentifier, listRequest);
return listRequest;
}
@Override
public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) {
return listRequestMap.get(requestIdentifier);
}
@Override
public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) {
logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier);
final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier);
if (request != null) {
request.cancel();
}
return request;
}
/**
* @return all FlowFiles that should be listed in response to a List Queue request
*/
protected abstract List<FlowFileRecord> getListableFlowFiles();
@Override
public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier);
// purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
if (dropRequestMap.size() > 10) {
final List<String> toDrop = new ArrayList<>();
for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) {
final DropFlowFileRequest request = entry.getValue();
final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
toDrop.add(entry.getKey());
}
}
for (final String requestId : toDrop) {
dropRequestMap.remove(requestId);
}
}
final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
final QueueSize originalSize = size();
dropRequest.setCurrentSize(originalSize);
dropRequest.setOriginalSize(originalSize);
if (originalSize.getObjectCount() == 0) {
dropRequest.setDroppedSize(originalSize);
dropRequest.setState(DropFlowFileState.COMPLETE);
dropRequestMap.put(requestIdentifier, dropRequest);
return dropRequest;
}
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
dropFlowFiles(dropRequest, requestor);
}
}, "Drop FlowFiles for Connection " + getIdentifier());
t.setDaemon(true);
t.start();
dropRequestMap.put(requestIdentifier, dropRequest);
return dropRequest;
}
@Override
public DropFlowFileRequest cancelDropFlowFileRequest(final String requestIdentifier) {
final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
if (request == null) {
return null;
}
request.cancel();
return request;
}
@Override
public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) {
return dropRequestMap.get(requestIdentifier);
}
/**
* Synchronously drops all FlowFiles in the queue
*
* @param dropRequest the request
* @param requestor the identity of the user/agent who made the request
*/
protected abstract void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor);
@Override
public void verifyCanList() throws IllegalStateException {
}
protected FlowFileSummary summarize(final FlowFileRecord flowFile, final int position) {
// extract all of the information that we care about into new variables rather than just
// wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to
// be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object,
// we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well,
// which can be problematic if we expect them to be swapped out.
final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final long size = flowFile.getSize();
final Long lastQueuedTime = flowFile.getLastQueueDate();
final long lineageStart = flowFile.getLineageStartDate();
final boolean penalized = flowFile.isPenalized();
final long penaltyExpires = flowFile.getPenaltyExpirationMillis();
return new FlowFileSummary() {
@Override
public String getUuid() {
return uuid;
}
@Override
public String getFilename() {
return filename;
}
@Override
public int getPosition() {
return position;
}
@Override
public long getSize() {
return size;
}
@Override
public long getLastQueuedTime() {
return lastQueuedTime == null ? 0L : lastQueuedTime;
}
@Override
public long getLineageStartDate() {
return lineageStart;
}
@Override
public boolean isPenalized() {
return penalized;
}
@Override
public long getPenaltyExpirationMillis() {
return penaltyExpires;
}
};
}
protected QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException {
// Create a Provenance Event and a FlowFile Repository record for each FlowFile
final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
long dropContentSize = 0L;
for (final FlowFileRecord flowFile : flowFiles) {
provenanceEvents.add(createDropProvenanceEvent(flowFile, requestor));
flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
dropContentSize += flowFile.getSize();
}
provRepository.registerEvents(provenanceEvents);
flowFileRepository.updateRepository(flowFileRepoRecords);
return new QueueSize(flowFiles.size(), dropContentSize);
}
private ProvenanceEventRecord createDropProvenanceEvent(final FlowFileRecord flowFile, final String requestor) {
final ProvenanceEventBuilder builder = provRepository.eventBuilder();
builder.fromFlowFile(flowFile);
builder.setEventType(ProvenanceEventType.DROP);
builder.setLineageStartDate(flowFile.getLineageStartDate());
builder.setComponentId(getIdentifier());
builder.setComponentType("Connection");
builder.setAttributes(flowFile.getAttributes(), Collections.emptyMap());
builder.setDetails("FlowFile Queue emptied by " + requestor);
builder.setSourceQueueIdentifier(getIdentifier());
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim != null) {
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize());
}
return builder.build();
}
private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) {
return new DropFlowFileRepositoryRecord(this, flowFile);
}
@Override
public void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
loadBalanceWriteLock.lock();
try {
if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
}
this.loadBalanceStrategy = strategy;
this.partitioningAttribute = partitioningAttribute;
} finally {
loadBalanceWriteLock.unlock();
}
}
@Override
public String getPartitioningAttribute() {
loadBalanceReadLock.lock();
try {
return partitioningAttribute;
} finally {
loadBalanceReadLock.unlock();
}
}
@Override
public LoadBalanceStrategy getLoadBalanceStrategy() {
loadBalanceReadLock.lock();
try {
return loadBalanceStrategy;
} finally {
loadBalanceReadLock.unlock();
}
}
@Override
public synchronized void setLoadBalanceCompression(final LoadBalanceCompression compression) {
loadBalanceWriteLock.lock();
try {
this.compression = compression;
} finally {
loadBalanceWriteLock.unlock();
}
}
@Override
public synchronized LoadBalanceCompression getLoadBalanceCompression() {
loadBalanceReadLock.lock();
try {
return compression;
} finally {
loadBalanceReadLock.unlock();
}
}
@Override
public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
}