blob: 77c080ad187c17df2a2f2399d027148c6b9eb3f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.security.kms.CryptoUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RepositoryConfiguration {
private static final Logger logger = LoggerFactory.getLogger(RepositoryConfiguration.class);
public static final String CONCURRENT_MERGE_THREADS = "nifi.provenance.repository.concurrent.merge.threads";
public static final String WARM_CACHE_FREQUENCY = "nifi.provenance.repository.warm.cache.frequency";
public static final String MAINTENACE_FREQUENCY = "nifi.provenance.repository.maintenance.frequency";
private final Map<String, File> storageDirectories = new LinkedHashMap<>();
private long recordLifeMillis = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
private long storageCapacity = 1024L * 1024L * 1024L; // 1 GB
private long eventFileMillis = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
private long eventFileBytes = 1024L * 1024L * 5L; // 5 MB
private int maxFileEvents = Integer.MAX_VALUE;
private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
private int journalCount = 16;
private int compressionBlockBytes = 1024 * 1024;
private int maxAttributeChars = 65536;
private int debugFrequency = 1_000_000;
private long maintenanceFrequencyMillis = TimeUnit.MINUTES.toMillis(1L);
// TODO: Delegaate to RepositoryEncryptionConfiguration in NIFI-6617
private Map<String, String> encryptionKeys;
private String keyId;
private String keyProviderImplementation;
private String keyProviderLocation;
private String keyProviderPassword;
private List<SearchableField> searchableFields = new ArrayList<>();
private List<SearchableField> searchableAttributes = new ArrayList<>();
private boolean compress = true;
private boolean alwaysSync = false;
private int queryThreadPoolSize = 2;
private int indexThreadPoolSize = 1;
private boolean allowRollover = true;
private int concurrentMergeThreads = 4;
private Integer warmCacheFrequencyMinutes = null;
public void setAllowRollover(final boolean allow) {
this.allowRollover = allow;
}
public boolean isAllowRollover() {
return allowRollover;
}
public int getCompressionBlockBytes() {
return compressionBlockBytes;
}
public void setCompressionBlockBytes(int compressionBlockBytes) {
this.compressionBlockBytes = compressionBlockBytes;
}
/**
* Specifies where the repository will store data
*
* @return the directories where provenance files will be stored
*/
public Map<String, File> getStorageDirectories() {
return Collections.unmodifiableMap(storageDirectories);
}
/**
* Specifies where the repository should store data
*
* @param storageDirectory the directory to store provenance files
*/
public void addStorageDirectory(final String partitionName, final File storageDirectory) {
this.storageDirectories.put(partitionName, storageDirectory);
}
public void addStorageDirectories(final Map<String, File> storageDirectories) {
this.storageDirectories.putAll(storageDirectories);
}
/**
* @param timeUnit the desired time unit
* @return the max amount of time that a given record will stay in the repository
*/
public long getMaxRecordLife(final TimeUnit timeUnit) {
return timeUnit.convert(recordLifeMillis, TimeUnit.MILLISECONDS);
}
/**
* Specifies how long a record should stay in the repository
*
* @param maxRecordLife the max amount of time to keep a record in the repo
* @param timeUnit the period of time used by maxRecordLife
*/
public void setMaxRecordLife(final long maxRecordLife, final TimeUnit timeUnit) {
this.recordLifeMillis = TimeUnit.MILLISECONDS.convert(maxRecordLife, timeUnit);
}
/**
* Returns the maximum amount of data to store in the repository (in bytes)
*
* @return the maximum amount of disk space to use for the prov repo
*/
public long getMaxStorageCapacity() {
return storageCapacity;
}
/**
* Sets the maximum amount of data to store in the repository (in bytes)
*
* @param maxStorageCapacity the maximum amount of disk space to use for the prov repo
*/
public void setMaxStorageCapacity(final long maxStorageCapacity) {
this.storageCapacity = maxStorageCapacity;
}
/**
* @param timeUnit the desired time unit for the returned value
* @return the maximum amount of time that the repo will write to a single event file
*/
public long getMaxEventFileLife(final TimeUnit timeUnit) {
return timeUnit.convert(eventFileMillis, TimeUnit.MILLISECONDS);
}
/**
* @param maxEventFileTime the max amount of time to write to a single event file
* @param timeUnit the units for the value supplied by maxEventFileTime
*/
public void setMaxEventFileLife(final long maxEventFileTime, final TimeUnit timeUnit) {
this.eventFileMillis = TimeUnit.MILLISECONDS.convert(maxEventFileTime, timeUnit);
}
/**
* @return the maximum number of bytes (pre-compression) that will be
* written to a single event file before the file is rolled over
*/
public long getMaxEventFileCapacity() {
return eventFileBytes;
}
/**
* @param maxEventFileBytes the maximum number of bytes (pre-compression) that will be written
* to a single event file before the file is rolled over
*/
public void setMaxEventFileCapacity(final long maxEventFileBytes) {
this.eventFileBytes = maxEventFileBytes;
}
/**
* @return the maximum number of events that should be written to a single event file before the file is rolled over
*/
public int getMaxEventFileCount() {
return maxFileEvents;
}
/**
* @param maxCount the maximum number of events that should be written to a single event file before the file is rolled over
*/
public void setMaxEventFileCount(final int maxCount) {
this.maxFileEvents = maxCount;
}
/**
* @return the fields that should be indexed
*/
public List<SearchableField> getSearchableFields() {
return Collections.unmodifiableList(searchableFields);
}
/**
* @param searchableFields the fields to index
*/
public void setSearchableFields(final List<SearchableField> searchableFields) {
this.searchableFields = new ArrayList<>(searchableFields);
}
/**
* @return the FlowFile attributes that should be indexed
*/
public List<SearchableField> getSearchableAttributes() {
return Collections.unmodifiableList(searchableAttributes);
}
/**
* @param searchableAttributes the FlowFile attributes to index
*/
public void setSearchableAttributes(final List<SearchableField> searchableAttributes) {
this.searchableAttributes = new ArrayList<>(searchableAttributes);
}
/**
* @return whether or not event files will be compressed when they are
* rolled over
*/
public boolean isCompressOnRollover() {
return compress;
}
/**
* @param compress if true, the data will be compressed when rolled over
*/
public void setCompressOnRollover(final boolean compress) {
this.compress = compress;
}
/**
* @return the number of threads to use to query the repo
*/
public int getQueryThreadPoolSize() {
return queryThreadPoolSize;
}
public void setQueryThreadPoolSize(final int queryThreadPoolSize) {
if (queryThreadPoolSize < 1) {
throw new IllegalArgumentException();
}
this.queryThreadPoolSize = queryThreadPoolSize;
}
/**
* @return the number of threads to use to index provenance events
*/
public int getIndexThreadPoolSize() {
return indexThreadPoolSize;
}
public void setIndexThreadPoolSize(final int indexThreadPoolSize) {
if (indexThreadPoolSize < 1) {
throw new IllegalArgumentException();
}
this.indexThreadPoolSize = indexThreadPoolSize;
}
public void setConcurrentMergeThreads(final int mergeThreads) {
this.concurrentMergeThreads = mergeThreads;
}
public int getConcurrentMergeThreads() {
return concurrentMergeThreads;
}
/**
* <p>
* Specifies the desired size of each Provenance Event index shard, in
* bytes. We shard the index for a few reasons:
* </p>
*
* <ol>
* <li>
* A very large index requires a significant amount of Java heap space to
* search. As the size of the shard increases, the required Java heap space
* also increases.</li>
* <li>
* By having multiple shards, we have the ability to use multiple concurrent
* threads to search the individual shards, resulting in far less latency
* when performing a search across millions or billions of records.</li>
* <li>
* We keep track of which time ranges each index shard spans. As a result,
* we are able to determine which shards need to be searched if a search
* provides a date range. This can greatly increase the speed of a search
* and reduce resource utilization.</li>
* </ol>
*
* @param bytes
* the number of bytes to write to an index before beginning a
* new shard
*/
public void setDesiredIndexSize(final long bytes) {
this.desiredIndexBytes = bytes;
}
/**
* @return the desired size of each index shard. See the
* {@link #setDesiredIndexSize} method for an explanation of why we choose
* to shard the index.
*/
public long getDesiredIndexSize() {
return desiredIndexBytes;
}
/**
* @param numJournals the number of Journal files to use when persisting records.
*/
public void setJournalCount(final int numJournals) {
if (numJournals < 1) {
throw new IllegalArgumentException();
}
this.journalCount = numJournals;
}
/**
* @return the number of Journal files that will be used when persisting records.
*/
public int getJournalCount() {
return journalCount;
}
/**
* @return <code>true</code> if the repository will perform an 'fsync' for all updates to disk
*/
public boolean isAlwaysSync() {
return alwaysSync;
}
/**
* Configures whether or not the Repository should sync all updates to disk.
* Setting this value to true means that updates are guaranteed to be
* persisted across restarted, even if there is a power failure or a sudden
* Operating System crash, but it can be very expensive.
*
* @param alwaysSync whether or not to perform an 'fsync' for all updates to disk
*/
public void setAlwaysSync(boolean alwaysSync) {
this.alwaysSync = alwaysSync;
}
/**
* @return the maximum number of characters to include in any attribute. If an attribute in a Provenance
* Event has more than this number of characters, it will be truncated when the event is retrieved.
*/
public int getMaxAttributeChars() {
return maxAttributeChars;
}
/**
* Sets the maximum number of characters to include in any attribute. If an attribute in a Provenance
* Event has more than this number of characters, it will be truncated when it is retrieved.
*/
public void setMaxAttributeChars(int maxAttributeChars) {
this.maxAttributeChars = maxAttributeChars;
}
public void setWarmCacheFrequencyMinutes(Integer frequencyMinutes) {
this.warmCacheFrequencyMinutes = frequencyMinutes;
}
public Optional<Integer> getWarmCacheFrequencyMinutes() {
return Optional.ofNullable(warmCacheFrequencyMinutes);
}
public boolean supportsEncryption() {
boolean keyProviderIsConfigured = CryptoUtils.isValidKeyProvider(keyProviderImplementation, keyProviderLocation, keyId, encryptionKeys);
return keyProviderIsConfigured;
}
// TODO: Add verbose error output for encryption support failure if requested
public Map<String, String> getEncryptionKeys() {
return encryptionKeys;
}
public void setEncryptionKeys(Map<String, String> encryptionKeys) {
this.encryptionKeys = encryptionKeys;
}
public String getKeyId() {
return keyId;
}
public void setKeyId(String keyId) {
this.keyId = keyId;
}
public String getKeyProviderImplementation() {
return keyProviderImplementation;
}
public void setKeyProviderImplementation(String keyProviderImplementation) {
this.keyProviderImplementation = keyProviderImplementation;
}
public String getKeyProviderLocation() {
return keyProviderLocation;
}
public void setKeyProviderLocation(String keyProviderLocation) {
this.keyProviderLocation = keyProviderLocation;
}
public String getKeyProviderPassword() {
return keyProviderPassword;
}
public void setKeyProviderPassword(final String keyProviderPassword) {
this.keyProviderPassword = keyProviderPassword;
}
public int getDebugFrequency() {
return debugFrequency;
}
public void setDebugFrequency(int debugFrequency) {
this.debugFrequency = debugFrequency;
}
public long getMaintenanceFrequency(final TimeUnit timeUnit) {
return timeUnit.convert(maintenanceFrequencyMillis, TimeUnit.MILLISECONDS);
}
public void setMaintenanceFrequency(final long period, final TimeUnit timeUnit) {
this.maintenanceFrequencyMillis = timeUnit.toMillis(period);
}
public static RepositoryConfiguration create(final NiFiProperties nifiProperties) {
final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths();
if (storageDirectories.isEmpty()) {
storageDirectories.put("provenance_repository", Paths.get("provenance_repository"));
}
final String storageTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 hours");
final String storageSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
final String rolloverTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
final String rolloverSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
final int rolloverEventCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_ROLLOVER_EVENT_COUNT, Integer.MAX_VALUE);
final String shardSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
final int queryThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2);
final int indexThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 2);
final int journalCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
final int concurrentMergeThreads = nifiProperties.getIntegerProperty(CONCURRENT_MERGE_THREADS, 2);
final String warmCacheFrequency = nifiProperties.getProperty(WARM_CACHE_FREQUENCY);
final String maintenanceFrequency = nifiProperties.getProperty(MAINTENACE_FREQUENCY);
final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue();
final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS);
final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, DataUnit.B).longValue();
final boolean compressOnRollover = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER));
final String indexedFieldString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
final String indexedAttrString = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
final Boolean alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty("nifi.provenance.repository.always.sync", "false"));
final int defaultMaxAttrChars = 65536;
final String maxAttrLength = nifiProperties.getProperty("nifi.provenance.repository.max.attribute.length", String.valueOf(defaultMaxAttrChars));
int maxAttrChars;
try {
maxAttrChars = Integer.parseInt(maxAttrLength);
// must be at least 36 characters because that's the length of the uuid attribute,
// which must be kept intact
if (maxAttrChars < 36) {
maxAttrChars = 36;
logger.warn("Found max attribute length property set to " + maxAttrLength + " but minimum length is 36; using 36 instead");
}
} catch (final Exception e) {
maxAttrChars = defaultMaxAttrChars;
}
final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
final List<SearchableField> searchableAttributes = SearchableFieldParser.extractSearchableFields(indexedAttrString, false);
// We always want to index the Event Time.
if (!searchableFields.contains(SearchableFields.EventTime)) {
searchableFields.add(SearchableFields.EventTime);
}
final RepositoryConfiguration config = new RepositoryConfiguration();
for (final Map.Entry<String, Path> entry : storageDirectories.entrySet()) {
config.addStorageDirectory(entry.getKey(), entry.getValue().toFile());
}
config.setCompressOnRollover(compressOnRollover);
config.setSearchableFields(searchableFields);
config.setSearchableAttributes(searchableAttributes);
config.setMaxEventFileCapacity(rolloverBytes);
config.setMaxEventFileCount(rolloverEventCount);
config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS);
config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
config.setMaxStorageCapacity(maxStorageBytes);
config.setQueryThreadPoolSize(queryThreads);
config.setIndexThreadPoolSize(indexThreads);
config.setJournalCount(journalCount);
config.setMaxAttributeChars(maxAttrChars);
config.setConcurrentMergeThreads(concurrentMergeThreads);
if (warmCacheFrequency != null && !warmCacheFrequency.trim().equals("")) {
config.setWarmCacheFrequencyMinutes((int) FormatUtils.getTimeDuration(warmCacheFrequency, TimeUnit.MINUTES));
}
if (shardSize != null) {
config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
}
if (maintenanceFrequency != null && !maintenanceFrequency.trim().equals("")) {
final long millis = FormatUtils.getTimeDuration(maintenanceFrequency.trim(), TimeUnit.MILLISECONDS);
config.setMaintenanceFrequency(millis, TimeUnit.MILLISECONDS);
}
config.setAlwaysSync(alwaysSync);
config.setDebugFrequency(nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_REPO_DEBUG_FREQUENCY, config.getDebugFrequency()));
// TODO: Check for multiple key loading (NIFI-6617)
// Encryption values may not be present but are only required for EncryptedWriteAheadProvenanceRepository
final String implementationClassName = nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
if (EncryptedWriteAheadProvenanceRepository.class.getName().equals(implementationClassName)) {
config.setEncryptionKeys(nifiProperties.getProvenanceRepoEncryptionKeys());
config.setKeyId(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID));
config.setKeyProviderImplementation(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS));
config.setKeyProviderLocation(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION));
config.setKeyProviderPassword(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_PASSWORD));
}
return config;
}
}