blob: 54d879bab5338ff81406a53ebce61844895b036a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.provenance;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RepositoryConfiguration {
private static final Logger logger = LoggerFactory.getLogger(RepositoryConfiguration.class);
public static final String CONCURRENT_MERGE_THREADS = "nifi.provenance.repository.concurrent.merge.threads";
public static final String WARM_CACHE_FREQUENCY = "nifi.provenance.repository.warm.cache.frequency";
public static final String MAINTENACE_FREQUENCY = "nifi.provenance.repository.maintenance.frequency";
private final Map<String, File> storageDirectories = new LinkedHashMap<>();
private long recordLifeMillis = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
private long storageCapacity = 1024L * 1024L * 1024L; // 1 GB
private long eventFileMillis = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
private long eventFileBytes = 1024L * 1024L * 5L; // 5 MB
private int maxFileEvents = Integer.MAX_VALUE;
private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
private int journalCount = 16;
private int compressionBlockBytes = 1024 * 1024;
private int maxAttributeChars = 65536;
private int debugFrequency = 1_000_000;
private long maintenanceFrequencyMillis = TimeUnit.MINUTES.toMillis(1L);
private List<SearchableField> searchableFields = new ArrayList<>();
private List<SearchableField> searchableAttributes = new ArrayList<>();
private boolean compress = true;
private boolean alwaysSync = false;
private int queryThreadPoolSize = 2;
private int indexThreadPoolSize = 1;
private boolean allowRollover = true;
private int concurrentMergeThreads = 4;
private Integer warmCacheFrequencyMinutes = null;
public void setAllowRollover(final boolean allow) {
this.allowRollover = allow;
public boolean isAllowRollover() {
return allowRollover;
public int getCompressionBlockBytes() {
return compressionBlockBytes;
public void setCompressionBlockBytes(int compressionBlockBytes) {
this.compressionBlockBytes = compressionBlockBytes;
* Specifies where the repository will store data
* @return the directories where provenance files will be stored
public Map<String, File> getStorageDirectories() {
return Collections.unmodifiableMap(storageDirectories);
* Specifies where the repository should store data
* @param storageDirectory the directory to store provenance files
public void addStorageDirectory(final String partitionName, final File storageDirectory) {
this.storageDirectories.put(partitionName, storageDirectory);
public void addStorageDirectories(final Map<String, File> storageDirectories) {
* @param timeUnit the desired time unit
* @return the max amount of time that a given record will stay in the repository
public long getMaxRecordLife(final TimeUnit timeUnit) {
return timeUnit.convert(recordLifeMillis, TimeUnit.MILLISECONDS);
* Specifies how long a record should stay in the repository
* @param maxRecordLife the max amount of time to keep a record in the repo
* @param timeUnit the period of time used by maxRecordLife
public void setMaxRecordLife(final long maxRecordLife, final TimeUnit timeUnit) {
this.recordLifeMillis = TimeUnit.MILLISECONDS.convert(maxRecordLife, timeUnit);
* Returns the maximum amount of data to store in the repository (in bytes)
* @return the maximum amount of disk space to use for the prov repo
public long getMaxStorageCapacity() {
return storageCapacity;
* Sets the maximum amount of data to store in the repository (in bytes)
* @param maxStorageCapacity the maximum amount of disk space to use for the prov repo
public void setMaxStorageCapacity(final long maxStorageCapacity) {
this.storageCapacity = maxStorageCapacity;
* @param timeUnit the desired time unit for the returned value
* @return the maximum amount of time that the repo will write to a single event file
public long getMaxEventFileLife(final TimeUnit timeUnit) {
return timeUnit.convert(eventFileMillis, TimeUnit.MILLISECONDS);
* @param maxEventFileTime the max amount of time to write to a single event file
* @param timeUnit the units for the value supplied by maxEventFileTime
public void setMaxEventFileLife(final long maxEventFileTime, final TimeUnit timeUnit) {
this.eventFileMillis = TimeUnit.MILLISECONDS.convert(maxEventFileTime, timeUnit);
* @return the maximum number of bytes (pre-compression) that will be
* written to a single event file before the file is rolled over
public long getMaxEventFileCapacity() {
return eventFileBytes;
* @param maxEventFileBytes the maximum number of bytes (pre-compression) that will be written
* to a single event file before the file is rolled over
public void setMaxEventFileCapacity(final long maxEventFileBytes) {
this.eventFileBytes = maxEventFileBytes;
* @return the maximum number of events that should be written to a single event file before the file is rolled over
public int getMaxEventFileCount() {
return maxFileEvents;
* @param maxCount the maximum number of events that should be written to a single event file before the file is rolled over
public void setMaxEventFileCount(final int maxCount) {
this.maxFileEvents = maxCount;
* @return the fields that should be indexed
public List<SearchableField> getSearchableFields() {
return Collections.unmodifiableList(searchableFields);
* @param searchableFields the fields to index
public void setSearchableFields(final List<SearchableField> searchableFields) {
this.searchableFields = new ArrayList<>(searchableFields);
* @return the FlowFile attributes that should be indexed
public List<SearchableField> getSearchableAttributes() {
return Collections.unmodifiableList(searchableAttributes);
* @param searchableAttributes the FlowFile attributes to index
public void setSearchableAttributes(final List<SearchableField> searchableAttributes) {
this.searchableAttributes = new ArrayList<>(searchableAttributes);
* @return whether or not event files will be compressed when they are
* rolled over
public boolean isCompressOnRollover() {
return compress;
* @param compress if true, the data will be compressed when rolled over
public void setCompressOnRollover(final boolean compress) {
this.compress = compress;
* @return the number of threads to use to query the repo
public int getQueryThreadPoolSize() {
return queryThreadPoolSize;
public void setQueryThreadPoolSize(final int queryThreadPoolSize) {
if (queryThreadPoolSize < 1) {
throw new IllegalArgumentException();
this.queryThreadPoolSize = queryThreadPoolSize;
* @return the number of threads to use to index provenance events
public int getIndexThreadPoolSize() {
return indexThreadPoolSize;
public void setIndexThreadPoolSize(final int indexThreadPoolSize) {
if (indexThreadPoolSize < 1) {
throw new IllegalArgumentException();
this.indexThreadPoolSize = indexThreadPoolSize;
public void setConcurrentMergeThreads(final int mergeThreads) {
this.concurrentMergeThreads = mergeThreads;
public int getConcurrentMergeThreads() {
return concurrentMergeThreads;
* <p>
* Specifies the desired size of each Provenance Event index shard, in
* bytes. We shard the index for a few reasons:
* </p>
* <ol>
* <li>
* A very large index requires a significant amount of Java heap space to
* search. As the size of the shard increases, the required Java heap space
* also increases.</li>
* <li>
* By having multiple shards, we have the ability to use multiple concurrent
* threads to search the individual shards, resulting in far less latency
* when performing a search across millions or billions of records.</li>
* <li>
* We keep track of which time ranges each index shard spans. As a result,
* we are able to determine which shards need to be searched if a search
* provides a date range. This can greatly increase the speed of a search
* and reduce resource utilization.</li>
* </ol>
* @param bytes
* the number of bytes to write to an index before beginning a
* new shard
public void setDesiredIndexSize(final long bytes) {
this.desiredIndexBytes = bytes;
* @return the desired size of each index shard. See the
* {@link #setDesiredIndexSize} method for an explanation of why we choose
* to shard the index.
public long getDesiredIndexSize() {
return desiredIndexBytes;
* @param numJournals the number of Journal files to use when persisting records.
public void setJournalCount(final int numJournals) {
if (numJournals < 1) {
throw new IllegalArgumentException();
this.journalCount = numJournals;
* @return the number of Journal files that will be used when persisting records.
public int getJournalCount() {
return journalCount;
* @return <code>true</code> if the repository will perform an 'fsync' for all updates to disk
public boolean isAlwaysSync() {
return alwaysSync;
* Configures whether or not the Repository should sync all updates to disk.
* Setting this value to true means that updates are guaranteed to be
* persisted across restarted, even if there is a power failure or a sudden
* Operating System crash, but it can be very expensive.
* @param alwaysSync whether or not to perform an 'fsync' for all updates to disk
public void setAlwaysSync(boolean alwaysSync) {
this.alwaysSync = alwaysSync;
* @return the maximum number of characters to include in any attribute. If an attribute in a Provenance
* Event has more than this number of characters, it will be truncated when the event is retrieved.
public int getMaxAttributeChars() {
return maxAttributeChars;
* Sets the maximum number of characters to include in any attribute. If an attribute in a Provenance
* Event has more than this number of characters, it will be truncated when it is retrieved.
public void setMaxAttributeChars(int maxAttributeChars) {
this.maxAttributeChars = maxAttributeChars;
public void setWarmCacheFrequencyMinutes(Integer frequencyMinutes) {
this.warmCacheFrequencyMinutes = frequencyMinutes;
public Optional<Integer> getWarmCacheFrequencyMinutes() {
return Optional.ofNullable(warmCacheFrequencyMinutes);
public int getDebugFrequency() {
return debugFrequency;
public void setDebugFrequency(int debugFrequency) {
this.debugFrequency = debugFrequency;
public long getMaintenanceFrequency(final TimeUnit timeUnit) {
return timeUnit.convert(maintenanceFrequencyMillis, TimeUnit.MILLISECONDS);
public void setMaintenanceFrequency(final long period, final TimeUnit timeUnit) {
this.maintenanceFrequencyMillis = timeUnit.toMillis(period);
public static RepositoryConfiguration create(final NiFiProperties nifiProperties) {
final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths();
if (storageDirectories.isEmpty()) {
storageDirectories.put("provenance_repository", Paths.get("provenance_repository"));
final String storageTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 hours");
final String storageSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
final String rolloverTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
final String rolloverSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
final int rolloverEventCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_ROLLOVER_EVENT_COUNT, Integer.MAX_VALUE);
final String shardSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
final int queryThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2);
final int indexThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 2);
final int journalCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
final int concurrentMergeThreads = nifiProperties.getIntegerProperty(CONCURRENT_MERGE_THREADS, 2);
final String warmCacheFrequency = nifiProperties.getProperty(WARM_CACHE_FREQUENCY);
final String maintenanceFrequency = nifiProperties.getProperty(MAINTENACE_FREQUENCY);
final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue();
final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS);
final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, DataUnit.B).longValue();
final boolean compressOnRollover = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER));
final String indexedFieldString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
final String indexedAttrString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
final Boolean alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty("nifi.provenance.repository.always.sync", "false"));
final int defaultMaxAttrChars = 65536;
final String maxAttrLength = nifiProperties.getProperty("nifi.provenance.repository.max.attribute.length", String.valueOf(defaultMaxAttrChars));
int maxAttrChars;
try {
maxAttrChars = Integer.parseInt(maxAttrLength);
// must be at least 36 characters because that's the length of the uuid attribute,
// which must be kept intact
if (maxAttrChars < 36) {
maxAttrChars = 36;
logger.warn("Found max attribute length property set to " + maxAttrLength + " but minimum length is 36; using 36 instead");
} catch (final Exception e) {
maxAttrChars = defaultMaxAttrChars;
final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
final List<SearchableField> searchableAttributes = SearchableFieldParser.extractSearchableFields(indexedAttrString, false);
// We always want to index the Event Time.
if (!searchableFields.contains(SearchableFields.EventTime)) {
final RepositoryConfiguration config = new RepositoryConfiguration();
for (final Map.Entry<String, Path> entry : storageDirectories.entrySet()) {
config.addStorageDirectory(entry.getKey(), entry.getValue().toFile());
config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS);
config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
if (warmCacheFrequency != null && !warmCacheFrequency.trim().equals("")) {
config.setWarmCacheFrequencyMinutes((int) FormatUtils.getTimeDuration(warmCacheFrequency, TimeUnit.MINUTES));
if (shardSize != null) {
config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
if (maintenanceFrequency != null && !maintenanceFrequency.trim().equals("")) {
final long millis = FormatUtils.getTimeDuration(maintenanceFrequency.trim(), TimeUnit.MILLISECONDS);
config.setMaintenanceFrequency(millis, TimeUnit.MILLISECONDS);
config.setDebugFrequency(nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_REPO_DEBUG_FREQUENCY, config.getDebugFrequency()));
return config;