| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.util; |
| |
| import org.apache.nifi.properties.ApplicationProperties; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.BufferedInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.InputStream; |
| import java.net.InetSocketAddress; |
| import java.nio.file.InvalidPathException; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| /** |
| * The NiFiProperties class holds all properties which are needed for various |
| * values to be available at runtime. It is strongly tied to the startup |
| * properties needed and is often refer to as the 'nifi.properties' file. The |
| * properties contains keys and values. Great care should be taken in leveraging |
| * this class or passing it along. Its use should be refactored and minimized |
| * over time. |
| */ |
| public class NiFiProperties extends ApplicationProperties { |
| private static final Logger logger = LoggerFactory.getLogger(NiFiProperties.class); |
| |
| // core properties |
| public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path"; |
| public static final String FLOW_CONFIGURATION_FILE = "nifi.flow.configuration.file"; |
| public static final String FLOW_CONFIGURATION_ARCHIVE_ENABLED = "nifi.flow.configuration.archive.enabled"; |
| public static final String FLOW_CONFIGURATION_ARCHIVE_DIR = "nifi.flow.configuration.archive.dir"; |
| public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_TIME = "nifi.flow.configuration.archive.max.time"; |
| public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE = "nifi.flow.configuration.archive.max.storage"; |
| public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_COUNT = "nifi.flow.configuration.archive.max.count"; |
| public static final String AUTHORIZER_CONFIGURATION_FILE = "nifi.authorizer.configuration.file"; |
| public static final String LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE = "nifi.login.identity.provider.configuration.file"; |
| public static final String REPOSITORY_DATABASE_DIRECTORY = "nifi.database.directory"; |
| public static final String RESTORE_DIRECTORY = "nifi.restore.directory"; |
| public static final String WRITE_DELAY_INTERVAL = "nifi.flowservice.writedelay.interval"; |
| public static final String AUTO_RESUME_STATE = "nifi.flowcontroller.autoResumeState"; |
| public static final String FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.period"; |
| public static final String NAR_LIBRARY_DIRECTORY = "nifi.nar.library.directory"; |
| public static final String NAR_LIBRARY_DIRECTORY_PREFIX = "nifi.nar.library.directory."; |
| public static final String NAR_LIBRARY_AUTOLOAD_DIRECTORY = "nifi.nar.library.autoload.directory"; |
| public static final String NAR_WORKING_DIRECTORY = "nifi.nar.working.directory"; |
| public static final String COMPONENT_DOCS_DIRECTORY = "nifi.documentation.working.directory"; |
| public static final String SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key"; |
| public static final String SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm"; |
| public static final String H2_URL_APPEND = "nifi.h2.url.append"; |
| public static final String REMOTE_INPUT_HOST = "nifi.remote.input.host"; |
| public static final String REMOTE_INPUT_PORT = "nifi.remote.input.socket.port"; |
| public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure"; |
| public static final String SITE_TO_SITE_HTTP_ENABLED = "nifi.remote.input.http.enabled"; |
| public static final String SITE_TO_SITE_HTTP_TRANSACTION_TTL = "nifi.remote.input.http.transaction.ttl"; |
| public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration"; |
| public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory"; |
| public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; |
| public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration"; |
| public static final String PROCESSOR_SCHEDULING_TIMEOUT = "nifi.processor.scheduling.timeout"; |
| public static final String BACKPRESSURE_COUNT = "nifi.queue.backpressure.count"; |
| public static final String BACKPRESSURE_SIZE = "nifi.queue.backpressure.size"; |
| |
| // Encryption Properties for all Repositories |
| public static final String REPOSITORY_ENCRYPTION_PROTOCOL_VERSION = "nifi.repository.encryption.protocol.version"; |
| public static final String REPOSITORY_ENCRYPTION_KEY_ID = "nifi.repository.encryption.key.id"; |
| public static final String REPOSITORY_ENCRYPTION_KEY_PROVIDER = "nifi.repository.encryption.key.provider"; |
| public static final String REPOSITORY_ENCRYPTION_KEY_PROVIDER_KEYSTORE_LOCATION = "nifi.repository.encryption.key.provider.keystore.location"; |
| public static final String REPOSITORY_ENCRYPTION_KEY_PROVIDER_KEYSTORE_PASSWORD = "nifi.repository.encryption.key.provider.keystore.password"; |
| |
| // content repository properties |
| public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory."; |
| public static final String CONTENT_REPOSITORY_IMPLEMENTATION = "nifi.content.repository.implementation"; |
| public static final String MAX_APPENDABLE_CLAIM_SIZE = "nifi.content.claim.max.appendable.size"; |
| public static final String MAX_FLOWFILES_PER_CLAIM = "nifi.content.claim.max.flow.files"; |
| public static final String CONTENT_ARCHIVE_MAX_RETENTION_PERIOD = "nifi.content.repository.archive.max.retention.period"; |
| public static final String CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE = "nifi.content.repository.archive.max.usage.percentage"; |
| public static final String CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE = "nifi.content.repository.archive.backpressure.percentage"; |
| public static final String CONTENT_ARCHIVE_ENABLED = "nifi.content.repository.archive.enabled"; |
| public static final String CONTENT_ARCHIVE_CLEANUP_FREQUENCY = "nifi.content.repository.archive.cleanup.frequency"; |
| public static final String CONTENT_VIEWER_URL = "nifi.content.viewer.url"; |
| public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY = "nifi.content.repository.encryption.key"; |
| public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY_ID = "nifi.content.repository.encryption.key.id"; |
| public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS = "nifi.content.repository.encryption.key.provider.implementation"; |
| public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION = "nifi.content.repository.encryption.key.provider.location"; |
| public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD = "nifi.content.repository.encryption.key.provider.password"; |
| |
| // flowfile repository properties |
| public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = "nifi.flowfile.repository.implementation"; |
| public static final String FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION = "nifi.flowfile.repository.wal.implementation"; |
| public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync"; |
| public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory"; |
| public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval"; |
| public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY = "nifi.flowfile.repository.encryption.key"; |
| public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID = "nifi.flowfile.repository.encryption.key.id"; |
| public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS = "nifi.flowfile.repository.encryption.key.provider.implementation"; |
| public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION = "nifi.flowfile.repository.encryption.key.provider.location"; |
| public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD = "nifi.flowfile.repository.encryption.key.provider.password"; |
| public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation"; |
| public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold"; |
| |
| // provenance properties |
| public static final String PROVENANCE_REPO_IMPLEMENTATION_CLASS = "nifi.provenance.repository.implementation"; |
| public static final String PROVENANCE_REPO_DIRECTORY_PREFIX = "nifi.provenance.repository.directory."; |
| public static final String PROVENANCE_MAX_STORAGE_TIME = "nifi.provenance.repository.max.storage.time"; |
| public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size"; |
| public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time"; |
| public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size"; |
| public static final String PROVENANCE_ROLLOVER_EVENT_COUNT = "nifi.provenance.repository.rollover.events"; |
| public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads"; |
| public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads"; |
| public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover"; |
| public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields"; |
| public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes"; |
| public static final String PROVENANCE_INDEX_SHARD_SIZE = "nifi.provenance.repository.index.shard.size"; |
| public static final String PROVENANCE_JOURNAL_COUNT = "nifi.provenance.repository.journal.count"; |
| public static final String PROVENANCE_REPO_ENCRYPTION_KEY = "nifi.provenance.repository.encryption.key"; |
| public static final String PROVENANCE_REPO_ENCRYPTION_KEY_ID = "nifi.provenance.repository.encryption.key.id"; |
| public static final String PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS = "nifi.provenance.repository.encryption.key.provider.implementation"; |
| public static final String PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION = "nifi.provenance.repository.encryption.key.provider.location"; |
| public static final String PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_PASSWORD = "nifi.provenance.repository.encryption.key.provider.password"; |
| public static final String PROVENANCE_REPO_DEBUG_FREQUENCY = "nifi.provenance.repository.debug.frequency"; |
| |
| // status repository properties |
| public static final String COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION = "nifi.components.status.repository.implementation"; |
| public static final String COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "nifi.components.status.snapshot.frequency"; |
| |
| // questdb status storage properties |
| public static final String STATUS_REPOSITORY_QUESTDB_PERSIST_NODE_DAYS = "nifi.status.repository.questdb.persist.node.days"; |
| public static final String STATUS_REPOSITORY_QUESTDB_PERSIST_COMPONENT_DAYS = "nifi.status.repository.questdb.persist.component.days"; |
| public static final String STATUS_REPOSITORY_QUESTDB_PERSIST_LOCATION = "nifi.status.repository.questdb.persist.location"; |
| |
| // security properties |
| public static final String SECURITY_KEYSTORE = "nifi.security.keystore"; |
| public static final String SECURITY_KEYSTORE_TYPE = "nifi.security.keystoreType"; |
| public static final String SECURITY_KEYSTORE_PASSWD = "nifi.security.keystorePasswd"; |
| public static final String SECURITY_KEY_PASSWD = "nifi.security.keyPasswd"; |
| public static final String SECURITY_TRUSTSTORE = "nifi.security.truststore"; |
| public static final String SECURITY_TRUSTSTORE_TYPE = "nifi.security.truststoreType"; |
| public static final String SECURITY_TRUSTSTORE_PASSWD = "nifi.security.truststorePasswd"; |
| public static final String SECURITY_AUTO_RELOAD_ENABLED = "nifi.security.autoreload.enabled"; |
| public static final String SECURITY_AUTO_RELOAD_INTERVAL = "nifi.security.autoreload.interval"; |
| public static final String SECURITY_USER_AUTHORIZER = "nifi.security.user.authorizer"; |
| public static final String SECURITY_ANONYMOUS_AUTHENTICATION = "nifi.security.allow.anonymous.authentication"; |
| public static final String SECURITY_USER_LOGIN_IDENTITY_PROVIDER = "nifi.security.user.login.identity.provider"; |
| public static final String SECURITY_OCSP_RESPONDER_URL = "nifi.security.ocsp.responder.url"; |
| public static final String SECURITY_OCSP_RESPONDER_CERTIFICATE = "nifi.security.ocsp.responder.certificate"; |
| public static final String SECURITY_IDENTITY_MAPPING_PATTERN_PREFIX = "nifi.security.identity.mapping.pattern."; |
| public static final String SECURITY_IDENTITY_MAPPING_VALUE_PREFIX = "nifi.security.identity.mapping.value."; |
| public static final String SECURITY_IDENTITY_MAPPING_TRANSFORM_PREFIX = "nifi.security.identity.mapping.transform."; |
| public static final String SECURITY_GROUP_MAPPING_PATTERN_PREFIX = "nifi.security.group.mapping.pattern."; |
| public static final String SECURITY_GROUP_MAPPING_VALUE_PREFIX = "nifi.security.group.mapping.value."; |
| public static final String SECURITY_GROUP_MAPPING_TRANSFORM_PREFIX = "nifi.security.group.mapping.transform."; |
| public static final String SECURITY_USER_JWS_KEY_ROTATION_PERIOD = "nifi.security.user.jws.key.rotation.period"; |
| |
| // oidc |
| public static final String SECURITY_USER_OIDC_DISCOVERY_URL = "nifi.security.user.oidc.discovery.url"; |
| public static final String SECURITY_USER_OIDC_CONNECT_TIMEOUT = "nifi.security.user.oidc.connect.timeout"; |
| public static final String SECURITY_USER_OIDC_READ_TIMEOUT = "nifi.security.user.oidc.read.timeout"; |
| public static final String SECURITY_USER_OIDC_CLIENT_ID = "nifi.security.user.oidc.client.id"; |
| public static final String SECURITY_USER_OIDC_CLIENT_SECRET = "nifi.security.user.oidc.client.secret"; |
| public static final String SECURITY_USER_OIDC_PREFERRED_JWSALGORITHM = "nifi.security.user.oidc.preferred.jwsalgorithm"; |
| public static final String SECURITY_USER_OIDC_ADDITIONAL_SCOPES = "nifi.security.user.oidc.additional.scopes"; |
| public static final String SECURITY_USER_OIDC_CLAIM_IDENTIFYING_USER = "nifi.security.user.oidc.claim.identifying.user"; |
| public static final String SECURITY_USER_OIDC_FALLBACK_CLAIMS_IDENTIFYING_USER = "nifi.security.user.oidc.fallback.claims.identifying.user"; |
| |
| // apache knox |
| public static final String SECURITY_USER_KNOX_URL = "nifi.security.user.knox.url"; |
| public static final String SECURITY_USER_KNOX_PUBLIC_KEY = "nifi.security.user.knox.publicKey"; |
| public static final String SECURITY_USER_KNOX_COOKIE_NAME = "nifi.security.user.knox.cookieName"; |
| public static final String SECURITY_USER_KNOX_AUDIENCES = "nifi.security.user.knox.audiences"; |
| |
| // saml |
| public static final String SECURITY_USER_SAML_IDP_METADATA_URL = "nifi.security.user.saml.idp.metadata.url"; |
| public static final String SECURITY_USER_SAML_SP_ENTITY_ID = "nifi.security.user.saml.sp.entity.id"; |
| public static final String SECURITY_USER_SAML_IDENTITY_ATTRIBUTE_NAME = "nifi.security.user.saml.identity.attribute.name"; |
| public static final String SECURITY_USER_SAML_GROUP_ATTRIBUTE_NAME = "nifi.security.user.saml.group.attribute.name"; |
| public static final String SECURITY_USER_SAML_METADATA_SIGNING_ENABLED = "nifi.security.user.saml.metadata.signing.enabled"; |
| public static final String SECURITY_USER_SAML_REQUEST_SIGNING_ENABLED = "nifi.security.user.saml.request.signing.enabled"; |
| public static final String SECURITY_USER_SAML_WANT_ASSERTIONS_SIGNED = "nifi.security.user.saml.want.assertions.signed"; |
| public static final String SECURITY_USER_SAML_SIGNATURE_ALGORITHM = "nifi.security.user.saml.signature.algorithm"; |
| public static final String SECURITY_USER_SAML_SIGNATURE_DIGEST_ALGORITHM = "nifi.security.user.saml.signature.digest.algorithm"; |
| public static final String SECURITY_USER_SAML_MESSAGE_LOGGING_ENABLED = "nifi.security.user.saml.message.logging.enabled"; |
| public static final String SECURITY_USER_SAML_AUTHENTICATION_EXPIRATION = "nifi.security.user.saml.authentication.expiration"; |
| public static final String SECURITY_USER_SAML_SINGLE_LOGOUT_ENABLED = "nifi.security.user.saml.single.logout.enabled"; |
| public static final String SECURITY_USER_SAML_HTTP_CLIENT_TRUSTSTORE_STRATEGY = "nifi.security.user.saml.http.client.truststore.strategy"; |
| public static final String SECURITY_USER_SAML_HTTP_CLIENT_CONNECT_TIMEOUT = "nifi.security.user.saml.http.client.connect.timeout"; |
| public static final String SECURITY_USER_SAML_HTTP_CLIENT_READ_TIMEOUT = "nifi.security.user.saml.http.client.read.timeout"; |
| |
| // web properties |
| public static final String WEB_HTTP_PORT = "nifi.web.http.port"; |
| public static final String WEB_HTTP_PORT_FORWARDING = "nifi.web.http.port.forwarding"; |
| public static final String WEB_HTTP_HOST = "nifi.web.http.host"; |
| public static final String WEB_HTTP_NETWORK_INTERFACE_PREFIX = "nifi.web.http.network.interface."; |
| public static final String WEB_HTTPS_PORT = "nifi.web.https.port"; |
| public static final String WEB_HTTPS_PORT_FORWARDING = "nifi.web.https.port.forwarding"; |
| public static final String WEB_HTTPS_HOST = "nifi.web.https.host"; |
| public static final String WEB_HTTPS_CIPHERSUITES_INCLUDE = "nifi.web.https.ciphersuites.include"; |
| public static final String WEB_HTTPS_CIPHERSUITES_EXCLUDE = "nifi.web.https.ciphersuites.exclude"; |
| public static final String WEB_HTTPS_NETWORK_INTERFACE_PREFIX = "nifi.web.https.network.interface."; |
| public static final String WEB_WORKING_DIR = "nifi.web.jetty.working.directory"; |
| public static final String WEB_THREADS = "nifi.web.jetty.threads"; |
| public static final String WEB_MAX_HEADER_SIZE = "nifi.web.max.header.size"; |
| public static final String WEB_PROXY_CONTEXT_PATH = "nifi.web.proxy.context.path"; |
| public static final String WEB_PROXY_HOST = "nifi.web.proxy.host"; |
| public static final String WEB_MAX_CONTENT_SIZE = "nifi.web.max.content.size"; |
| public static final String WEB_MAX_REQUESTS_PER_SECOND = "nifi.web.max.requests.per.second"; |
| public static final String WEB_MAX_ACCESS_TOKEN_REQUESTS_PER_SECOND = "nifi.web.max.access.token.requests.per.second"; |
| public static final String WEB_REQUEST_TIMEOUT = "nifi.web.request.timeout"; |
| public static final String WEB_REQUEST_IP_WHITELIST = "nifi.web.request.ip.whitelist"; |
| public static final String WEB_SHOULD_SEND_SERVER_VERSION = "nifi.web.should.send.server.version"; |
| |
| // ui properties |
| public static final String UI_BANNER_TEXT = "nifi.ui.banner.text"; |
| public static final String UI_AUTO_REFRESH_INTERVAL = "nifi.ui.autorefresh.interval"; |
| |
| // cluster common properties |
| public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval"; |
| public static final String CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX = "nifi.cluster.protocol.heartbeat.missable.max"; |
| public static final String CLUSTER_PROTOCOL_IS_SECURE = "nifi.cluster.protocol.is.secure"; |
| |
| // cluster node properties |
| public static final String CLUSTER_IS_NODE = "nifi.cluster.is.node"; |
| public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address"; |
| public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port"; |
| public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads"; |
| public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout"; |
| public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout"; |
| public static final String CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = "nifi.cluster.node.max.concurrent.requests"; |
| public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file"; |
| public static final String FLOW_ELECTION_MAX_WAIT_TIME = "nifi.cluster.flow.election.max.wait.time"; |
| public static final String FLOW_ELECTION_MAX_CANDIDATES = "nifi.cluster.flow.election.max.candidates"; |
| |
| // cluster load balance properties |
| public static final String LOAD_BALANCE_HOST = "nifi.cluster.load.balance.host"; |
| public static final String LOAD_BALANCE_PORT = "nifi.cluster.load.balance.port"; |
| public static final String LOAD_BALANCE_CONNECTIONS_PER_NODE = "nifi.cluster.load.balance.connections.per.node"; |
| public static final String LOAD_BALANCE_MAX_THREAD_COUNT = "nifi.cluster.load.balance.max.thread.count"; |
| public static final String LOAD_BALANCE_COMMS_TIMEOUT = "nifi.cluster.load.balance.comms.timeout"; |
| |
| // zookeeper properties |
| public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string"; |
| public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout"; |
| public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout"; |
| public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node"; |
| public static final String ZOOKEEPER_CLIENT_SECURE = "nifi.zookeeper.client.secure"; |
| public static final String ZOOKEEPER_SECURITY_KEYSTORE = "nifi.zookeeper.security.keystore"; |
| public static final String ZOOKEEPER_SECURITY_KEYSTORE_TYPE = "nifi.zookeeper.security.keystoreType"; |
| public static final String ZOOKEEPER_SECURITY_KEYSTORE_PASSWD = "nifi.zookeeper.security.keystorePasswd"; |
| public static final String ZOOKEEPER_SECURITY_TRUSTSTORE = "nifi.zookeeper.security.truststore"; |
| public static final String ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE = "nifi.zookeeper.security.truststoreType"; |
| public static final String ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD = "nifi.zookeeper.security.truststorePasswd"; |
| public static final String ZOOKEEPER_AUTH_TYPE = "nifi.zookeeper.auth.type"; |
| public static final String ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL = "nifi.zookeeper.kerberos.removeHostFromPrincipal"; |
| public static final String ZOOKEEPER_KERBEROS_REMOVE_REALM_FROM_PRINCIPAL = "nifi.zookeeper.kerberos.removeRealmFromPrincipal"; |
| public static final String ZOOKEEPER_JUTE_MAXBUFFER = "nifi.zookeeper.jute.maxbuffer"; |
| |
| // kerberos properties |
| public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file"; |
| public static final String KERBEROS_SERVICE_PRINCIPAL = "nifi.kerberos.service.principal"; |
| public static final String KERBEROS_SERVICE_KEYTAB_LOCATION = "nifi.kerberos.service.keytab.location"; |
| public static final String KERBEROS_SPNEGO_PRINCIPAL = "nifi.kerberos.spnego.principal"; |
| public static final String KERBEROS_SPNEGO_KEYTAB_LOCATION = "nifi.kerberos.spnego.keytab.location"; |
| public static final String KERBEROS_AUTHENTICATION_EXPIRATION = "nifi.kerberos.spnego.authentication.expiration"; |
| |
| // state management |
| public static final String STATE_MANAGEMENT_CONFIG_FILE = "nifi.state.management.configuration.file"; |
| public static final String STATE_MANAGEMENT_LOCAL_PROVIDER_ID = "nifi.state.management.provider.local"; |
| public static final String STATE_MANAGEMENT_CLUSTER_PROVIDER_ID = "nifi.state.management.provider.cluster"; |
| public static final String STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER = "nifi.state.management.embedded.zookeeper.start"; |
| public static final String STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES = "nifi.state.management.embedded.zookeeper.properties"; |
| |
| // expression language properties |
| public static final String VARIABLE_REGISTRY_PROPERTIES = "nifi.variable.registry.properties"; |
| |
| // analytics properties |
| public static final String ANALYTICS_PREDICTION_ENABLED = "nifi.analytics.predict.enabled"; |
| public static final String ANALYTICS_PREDICTION_INTERVAL = "nifi.analytics.predict.interval"; |
| public static final String ANALYTICS_QUERY_INTERVAL = "nifi.analytics.query.interval"; |
| public static final String ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "nifi.analytics.connection.model.implementation"; |
| public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME = "nifi.analytics.connection.model.score.name"; |
| public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold"; |
| |
| // runtime monitoring properties |
| public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE = "nifi.monitor.long.running.task.schedule"; |
| public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD = "nifi.monitor.long.running.task.threshold"; |
| |
| // automatic diagnostic properties |
| public static final String DIAGNOSTICS_ON_SHUTDOWN_ENABLED = "nifi.diagnostics.on.shutdown.enabled"; |
| public static final String DIAGNOSTICS_ON_SHUTDOWN_VERBOSE = "nifi.diagnostics.on.shutdown.verbose"; |
| public static final String DIAGNOSTICS_ON_SHUTDOWN_DIRECTORY = "nifi.diagnostics.on.shutdown.directory"; |
| public static final String DIAGNOSTICS_ON_SHUTDOWN_MAX_FILE_COUNT = "nifi.diagnostics.on.shutdown.max.filecount"; |
| public static final String DIAGNOSTICS_ON_SHUTDOWN_MAX_DIRECTORY_SIZE = "nifi.diagnostics.on.shutdown.max.directory.size"; |
| |
| // automatic diagnostic defaults |
| public static final String DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_DIRECTORY = "./diagnostics"; |
| public static final int DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_FILE_COUNT = 10; |
| public static final String DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_DIRECTORY_SIZE = "10 MB"; |
| |
| // defaults |
| public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; |
| public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml"; |
| public static final String DEFAULT_LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE = "conf/login-identity-providers.xml"; |
| public static final Integer DEFAULT_REMOTE_INPUT_PORT = null; |
| public static final Path DEFAULT_TEMPLATE_DIRECTORY = Paths.get("conf", "templates"); |
| public static final int DEFAULT_WEB_THREADS = 200; |
| public static final String DEFAULT_WEB_MAX_HEADER_SIZE = "16 KB"; |
| public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty"; |
| public static final String DEFAULT_WEB_MAX_CONTENT_SIZE = "20 MB"; |
| public static final int DEFAULT_WEB_MAX_REQUESTS_PER_SECOND = 30000; |
| public static final int DEFAULT_WEB_MAX_ACCESS_TOKEN_REQUESTS_PER_SECOND = 25; |
| public static final String DEFAULT_WEB_REQUEST_TIMEOUT = "60 secs"; |
| public static final String DEFAULT_NAR_WORKING_DIR = "./work/nar"; |
| public static final String DEFAULT_COMPONENT_DOCS_DIRECTORY = "./work/docs/components"; |
| public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib"; |
| public static final String DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR = "./extensions"; |
| public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "20 secs"; |
| public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100; |
| public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "1 MB"; |
| public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000; |
| public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L; |
| public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB"; |
| public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec"; |
| public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins"; |
| public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis"; |
| public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs"; |
| public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs"; |
| public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi"; |
| public static final boolean DEFAULT_ZOOKEEPER_CLIENT_SECURE = false; |
| public static final String DEFAULT_ZOOKEEPER_AUTH_TYPE = "default"; |
| public static final String DEFAULT_ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL = "true"; |
| public static final String DEFAULT_ZOOKEEPER_KERBEROS_REMOVE_REALM_FROM_PRINCIPAL = "true"; |
| // Based on org.apache.jute.BinaryInputArchive.maxBuffer hexadecimal 0xfffff from ZooKeeper NIOServerCnxn |
| public static final int DEFAULT_ZOOKEEPER_JUTE_MAXBUFFER = 1048575; |
| public static final String DEFAULT_SECURITY_AUTO_RELOAD_INTERVAL = "10 secs"; |
| public static final String DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL = "30 secs"; |
| public static final String DEFAULT_FLOW_CONFIGURATION_ARCHIVE_ENABLED = "true"; |
| public static final String DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_TIME = "30 days"; |
| public static final String DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE = "500 MB"; |
| public static final String DEFAULT_SECURITY_USER_OIDC_CONNECT_TIMEOUT = "5 secs"; |
| public static final String DEFAULT_SECURITY_USER_OIDC_READ_TIMEOUT = "5 secs"; |
| public static final String DEFAULT_SECURITY_USER_SAML_METADATA_SIGNING_ENABLED = "false"; |
| public static final String DEFAULT_SECURITY_USER_SAML_REQUEST_SIGNING_ENABLED = "false"; |
| public static final String DEFAULT_SECURITY_USER_SAML_WANT_ASSERTIONS_SIGNED = "true"; |
| public static final String DEFAULT_SECURITY_USER_SAML_SIGNATURE_ALGORITHM = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"; |
| public static final String DEFAULT_SECURITY_USER_SAML_DIGEST_ALGORITHM = "http://www.w3.org/2001/04/xmlenc#sha256"; |
| public static final String DEFAULT_SECURITY_USER_SAML_MESSAGE_LOGGING_ENABLED = "false"; |
| public static final String DEFAULT_SECURITY_USER_SAML_AUTHENTICATION_EXPIRATION = "12 hours"; |
| public static final String DEFAULT_SECURITY_USER_SAML_SINGLE_LOGOUT_ENABLED = "false"; |
| public static final String DEFAULT_SECURITY_USER_SAML_HTTP_CLIENT_TRUSTSTORE_STRATEGY = "JDK"; |
| public static final String DEFAULT_SECURITY_USER_SAML_HTTP_CLIENT_CONNECT_TIMEOUT = "30 secs"; |
| public static final String DEFAULT_SECURITY_USER_SAML_HTTP_CLIENT_READ_TIMEOUT = "30 secs"; |
| private static final String DEFAULT_SECURITY_USER_JWS_KEY_ROTATION_PERIOD = "PT1H"; |
| public static final String DEFAULT_WEB_SHOULD_SEND_SERVER_VERSION = "true"; |
| |
| // cluster common defaults |
| public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec"; |
| public static final int DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX = 8; |
| public static final String DEFAULT_CLUSTER_NODE_READ_TIMEOUT = "5 sec"; |
| public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec"; |
| public static final int DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = 100; |
| |
| // cluster node defaults |
| public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10; |
| public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS = 50; |
| public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins"; |
| |
| // cluster load balance defaults |
| public static final int DEFAULT_LOAD_BALANCE_PORT = 6342; |
| public static final int DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE = 4; |
| public static final int DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT = 8; |
| public static final String DEFAULT_LOAD_BALANCE_COMMS_TIMEOUT = "30 sec"; |
| |
| |
| // state management defaults |
| public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = "conf/state-management.xml"; |
| |
| // Kerberos defaults |
| public static final String DEFAULT_KERBEROS_AUTHENTICATION_EXPIRATION = "12 hours"; |
| |
| // analytics defaults |
| public static final String DEFAULT_ANALYTICS_PREDICTION_ENABLED = "false"; |
| public static final String DEFAULT_ANALYTICS_PREDICTION_INTERVAL = "3 mins"; |
| public static final String DEFAULT_ANALYTICS_QUERY_INTERVAL = "3 mins"; |
| public final static String DEFAULT_ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares"; |
| public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = "rSquared"; |
| public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = .90; |
| |
| // Status repository defaults |
| public static final int DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_NODE_DAYS = 14; |
| public static final int DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_COMPONENT_DAYS = 3; |
| public static final String DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_LOCATION = "./status_repository"; |
| |
| public NiFiProperties() { |
| this(Collections.EMPTY_MAP); |
| } |
| |
| public NiFiProperties(final Map<String, String> props) { |
| super(props); |
| } |
| |
| public NiFiProperties(final Properties props) { |
| super(props); |
| } |
| |
| // getters for core properties // |
| public File getFlowConfigurationFile() { |
| try { |
| return new File(getProperty(FLOW_CONFIGURATION_FILE)); |
| } catch (Exception ex) { |
| return null; |
| } |
| } |
| |
| public File getFlowConfigurationFileDir() { |
| try { |
| return getFlowConfigurationFile().getParentFile(); |
| } catch (Exception ex) { |
| return null; |
| } |
| } |
| |
| private Integer getPropertyAsPort(final String propertyName, final Integer defaultValue) { |
| final String port = getProperty(propertyName); |
| if (StringUtils.isEmpty(port)) { |
| return defaultValue; |
| } |
| try { |
| final int val = Integer.parseInt(port); |
| if (val <= 0 || val > 65535) { |
| throw new RuntimeException("Valid port range is 0 - 65535 but got " + val); |
| } |
| return val; |
| } catch (final NumberFormatException e) { |
| return defaultValue; |
| } |
| } |
| |
| public int getQueueSwapThreshold() { |
| final String thresholdValue = getProperty(QUEUE_SWAP_THRESHOLD); |
| if (thresholdValue == null) { |
| return DEFAULT_QUEUE_SWAP_THRESHOLD; |
| } |
| |
| try { |
| return Integer.parseInt(thresholdValue); |
| } catch (final NumberFormatException e) { |
| return DEFAULT_QUEUE_SWAP_THRESHOLD; |
| } |
| } |
| |
| public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) { |
| final String value = getProperty(propertyName); |
| if (value == null || value.trim().isEmpty()) { |
| return defaultValue; |
| } |
| |
| try { |
| return Integer.parseInt(value.trim()); |
| } catch (final Exception e) { |
| return defaultValue; |
| } |
| } |
| |
| |
| public String getAdministrativeYieldDuration() { |
| return getProperty(ADMINISTRATIVE_YIELD_DURATION, DEFAULT_ADMINISTRATIVE_YIELD_DURATION); |
| } |
| |
| /** |
| * The host name that will be given out to clients to connect to the Remote |
| * Input Port. |
| * |
| * @return the remote input host name or null if not configured |
| */ |
| public String getRemoteInputHost() { |
| final String value = getProperty(REMOTE_INPUT_HOST); |
| return StringUtils.isBlank(value) ? null : value; |
| } |
| |
| /** |
| * The socket port to listen on for a Remote Input Port. |
| * |
| * @return the remote input port for RAW socket communication |
| */ |
| public Integer getRemoteInputPort() { |
| return getPropertyAsPort(REMOTE_INPUT_PORT, DEFAULT_REMOTE_INPUT_PORT); |
| } |
| |
| /** |
| * @return False if property value is 'false'; True otherwise. |
| */ |
| public Boolean isSiteToSiteSecure() { |
| final String secureVal = getProperty(SITE_TO_SITE_SECURE, "true"); |
| |
| return !"false".equalsIgnoreCase(secureVal); |
| |
| } |
| |
| /** |
| * @return True if property value is 'true'; False otherwise. |
| */ |
| public Boolean isSiteToSiteHttpEnabled() { |
| final String remoteInputHttpEnabled = getProperty(SITE_TO_SITE_HTTP_ENABLED, "false"); |
| |
| return "true".equalsIgnoreCase(remoteInputHttpEnabled); |
| |
| } |
| |
| /** |
| * The HTTP or HTTPS Web API port for a Remote Input Port. |
| * |
| * @return the remote input port for HTTP(S) communication, or null if |
| * HTTP(S) Site-to-Site is not enabled |
| */ |
| public Integer getRemoteInputHttpPort() { |
| if (!isSiteToSiteHttpEnabled()) { |
| return null; |
| } |
| |
| final String propertyKey; |
| if (isSiteToSiteSecure()) { |
| if (StringUtils.isBlank(getProperty(NiFiProperties.WEB_HTTPS_PORT_FORWARDING))) { |
| propertyKey = WEB_HTTPS_PORT; |
| } else { |
| propertyKey = WEB_HTTPS_PORT_FORWARDING; |
| } |
| } else { |
| if (StringUtils.isBlank(getProperty(NiFiProperties.WEB_HTTP_PORT_FORWARDING))) { |
| propertyKey = WEB_HTTP_PORT; |
| } else { |
| propertyKey = WEB_HTTP_PORT_FORWARDING; |
| } |
| } |
| |
| final Integer port = getIntegerProperty(propertyKey, null); |
| if (port == null) { |
| throw new RuntimeException("Remote input HTTP" + (isSiteToSiteSecure() ? "S" : "") |
| + " is enabled but " + propertyKey + " is not specified."); |
| } |
| return port; |
| } |
| |
| /** |
| * Returns the directory to which Templates are to be persisted |
| * |
| * @return the template directory |
| */ |
| public Path getTemplateDirectory() { |
| final String strVal = getProperty(TEMPLATE_DIRECTORY); |
| return (strVal == null) ? DEFAULT_TEMPLATE_DIRECTORY : Paths.get(strVal); |
| } |
| |
| /** |
| * Get the flow service write delay. |
| * |
| * @return The write delay |
| */ |
| public String getFlowServiceWriteDelay() { |
| return getProperty(WRITE_DELAY_INTERVAL); |
| } |
| |
| /** |
| * Returns whether the processors should be started automatically when the |
| * application loads. |
| * |
| * @return Whether to auto start the processors or not |
| */ |
| public boolean getAutoResumeState() { |
| final String rawAutoResumeState = getProperty(AUTO_RESUME_STATE, |
| DEFAULT_AUTO_RESUME_STATE.toString()); |
| return Boolean.parseBoolean(rawAutoResumeState); |
| } |
| |
| |
| /** |
| * Returns the number of milliseconds between FlowFileRepository |
| * checkpointing |
| * |
| * @return the number of milliseconds between checkpoint events |
| */ |
| public String getFlowFileRepositoryCheckpointInterval() { |
| return getProperty(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL); |
| } |
| |
| /** |
| * @return the restore directory or null if not configured |
| */ |
| public File getRestoreDirectory() { |
| final String value = getProperty(RESTORE_DIRECTORY); |
| if (StringUtils.isBlank(value)) { |
| return null; |
| } else { |
| return new File(value); |
| } |
| } |
| |
| /** |
| * @return the user authorizers file |
| */ |
| public File getAuthorizerConfigurationFile() { |
| final String value = getProperty(AUTHORIZER_CONFIGURATION_FILE); |
| if (StringUtils.isBlank(value)) { |
| return new File(DEFAULT_AUTHORIZER_CONFIGURATION_FILE); |
| } else { |
| return new File(value); |
| } |
| } |
| |
| /** |
| * @return the user login identity provider file |
| */ |
| public File getLoginIdentityProviderConfigurationFile() { |
| final String value = getProperty(LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE); |
| if (StringUtils.isBlank(value)) { |
| return new File(DEFAULT_LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE); |
| } else { |
| return new File(value); |
| } |
| } |
| |
| // getters for web properties // |
| public Integer getPort() { |
| Integer port = null; |
| try { |
| port = Integer.parseInt(getProperty(WEB_HTTP_PORT)); |
| } catch (NumberFormatException nfe) { |
| } |
| return port; |
| } |
| |
| public Integer getSslPort() { |
| Integer sslPort = null; |
| try { |
| sslPort = Integer.parseInt(getProperty(WEB_HTTPS_PORT)); |
| } catch (NumberFormatException nfe) { |
| } |
| return sslPort; |
| } |
| |
| public boolean isHTTPSConfigured() { |
| return getSslPort() != null; |
| } |
| |
| /** |
| * Determines the HTTP/HTTPS port NiFi is configured to bind to. Prefers the HTTPS port. Throws an exception if neither is configured. |
| * |
| * @return the configured port number |
| */ |
| public Integer getConfiguredHttpOrHttpsPort() throws RuntimeException { |
| if (getSslPort() != null) { |
| return getSslPort(); |
| } else if (getPort() != null) { |
| return getPort(); |
| } else { |
| throw new RuntimeException("The HTTP or HTTPS port must be configured"); |
| } |
| } |
| |
| public String getWebMaxHeaderSize() { |
| return getProperty(WEB_MAX_HEADER_SIZE, DEFAULT_WEB_MAX_HEADER_SIZE); |
| } |
| |
| /** |
| * Returns the {@code nifi.web.max.content.size} value from {@code nifi.properties}. |
| * Does not provide a default value because the presence of any value here enables the |
| * {@code ContentLengthFilter}. |
| * |
| * @return the specified max content-length and units for an incoming HTTP request |
| */ |
| public String getWebMaxContentSize() { |
| return getProperty(WEB_MAX_CONTENT_SIZE); |
| } |
| |
| public Integer getMaxWebRequestsPerSecond() { |
| return getIntegerProperty(WEB_MAX_REQUESTS_PER_SECOND, DEFAULT_WEB_MAX_REQUESTS_PER_SECOND); |
| } |
| |
| public Integer getMaxWebAccessTokenRequestsPerSecond() { |
| return getIntegerProperty(WEB_MAX_ACCESS_TOKEN_REQUESTS_PER_SECOND, DEFAULT_WEB_MAX_ACCESS_TOKEN_REQUESTS_PER_SECOND); |
| } |
| |
| public String getWebRequestTimeout() { |
| return getProperty(WEB_REQUEST_TIMEOUT, DEFAULT_WEB_REQUEST_TIMEOUT); |
| } |
| |
| public String getWebRequestIpWhitelist() { |
| return getProperty(WEB_REQUEST_IP_WHITELIST); |
| } |
| |
| public int getWebThreads() { |
| return getIntegerProperty(WEB_THREADS, DEFAULT_WEB_THREADS); |
| } |
| |
| public int getClusterNodeMaxConcurrentRequests() { |
| return getIntegerProperty(CLUSTER_NODE_MAX_CONCURRENT_REQUESTS, DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS); |
| } |
| |
| public File getWebWorkingDirectory() { |
| return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR)); |
| } |
| |
| public File getComponentDocumentationWorkingDirectory() { |
| return new File(getProperty(COMPONENT_DOCS_DIRECTORY, DEFAULT_COMPONENT_DOCS_DIRECTORY)); |
| } |
| |
| public File getNarWorkingDirectory() { |
| return new File(getProperty(NAR_WORKING_DIRECTORY, DEFAULT_NAR_WORKING_DIR)); |
| } |
| |
| public File getFrameworkWorkingDirectory() { |
| return new File(getNarWorkingDirectory(), "framework"); |
| } |
| |
| public File getExtensionsWorkingDirectory() { |
| return new File(getNarWorkingDirectory(), "extensions"); |
| } |
| |
| public List<Path> getNarLibraryDirectories() { |
| |
| List<Path> narLibraryPaths = new ArrayList<>(); |
| |
| // go through each property |
| for (String propertyName : getPropertyKeys()) { |
| // determine if the property is a nar library path |
| if (StringUtils.startsWith(propertyName, NAR_LIBRARY_DIRECTORY_PREFIX) |
| || NAR_LIBRARY_DIRECTORY.equals(propertyName) |
| || NAR_LIBRARY_AUTOLOAD_DIRECTORY.equals(propertyName)) { |
| // attempt to resolve the path specified |
| String narLib = getProperty(propertyName); |
| if (!StringUtils.isBlank(narLib)) { |
| narLibraryPaths.add(Paths.get(narLib)); |
| } |
| } |
| } |
| |
| if (narLibraryPaths.isEmpty()) { |
| narLibraryPaths.add(Paths.get(DEFAULT_NAR_LIBRARY_DIR)); |
| } |
| |
| return narLibraryPaths; |
| } |
| |
| public File getNarAutoLoadDirectory() { |
| return new File(getProperty(NAR_LIBRARY_AUTOLOAD_DIRECTORY, DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR)); |
| } |
| |
| // getters for ui properties // |
| |
| /** |
| * Get the banner text. |
| * |
| * @return The banner text |
| */ |
| public String getBannerText() { |
| return this.getProperty(UI_BANNER_TEXT, StringUtils.EMPTY); |
| } |
| |
| /** |
| * Returns the auto refresh interval in seconds. |
| * |
| * @return the interval over which the properties should auto refresh |
| */ |
| public String getAutoRefreshInterval() { |
| return getProperty(UI_AUTO_REFRESH_INTERVAL); |
| } |
| |
| /** |
| * Returns true if auto reload of the keystore and truststore is enabled. |
| * |
| * @return true if auto reload of the keystore and truststore is enabled. |
| */ |
| public boolean isSecurityAutoReloadEnabled() { |
| return this.getProperty(SECURITY_AUTO_RELOAD_ENABLED, Boolean.FALSE.toString()).equals(Boolean.TRUE.toString()); |
| } |
| |
| /** |
| * Returns the auto reload interval of the keystore and truststore. |
| * |
| * @return The interval over which the keystore and truststore should auto-reload. |
| */ |
| public String getSecurityAutoReloadInterval() { |
| return getProperty(SECURITY_AUTO_RELOAD_INTERVAL, DEFAULT_SECURITY_AUTO_RELOAD_INTERVAL); |
| } |
| |
| public Duration getSecurityUserJwsKeyRotationPeriod() { |
| return Duration.parse(getProperty(SECURITY_USER_JWS_KEY_ROTATION_PERIOD, DEFAULT_SECURITY_USER_JWS_KEY_ROTATION_PERIOD)); |
| } |
| |
| // getters for cluster protocol properties // |
| public String getClusterProtocolHeartbeatInterval() { |
| return getProperty(CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, |
| DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); |
| } |
| |
| public String getNodeHeartbeatInterval() { |
| return getClusterProtocolHeartbeatInterval(); |
| } |
| |
| public String getClusterNodeReadTimeout() { |
| return getProperty(CLUSTER_NODE_READ_TIMEOUT, DEFAULT_CLUSTER_NODE_READ_TIMEOUT); |
| } |
| |
| public String getClusterNodeConnectionTimeout() { |
| return getProperty(CLUSTER_NODE_CONNECTION_TIMEOUT, |
| DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT); |
| } |
| |
| // getters for cluster node properties // |
| public boolean isNode() { |
| return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); |
| } |
| |
| public InetSocketAddress getClusterNodeProtocolAddress() { |
| try { |
| String socketAddress = getProperty(CLUSTER_NODE_ADDRESS); |
| if (StringUtils.isBlank(socketAddress)) { |
| socketAddress = "localhost"; |
| } |
| int socketPort = getClusterNodeProtocolPort(); |
| if (socketPort == 0) { |
| throw new RuntimeException("Cluster Node Protocol port cannot be 0. Port must be inclusively in the range [1, 65535]."); |
| } |
| return InetSocketAddress.createUnresolved(socketAddress, socketPort); |
| } catch (Exception ex) { |
| throw new RuntimeException("Invalid node protocol address/port due to: " + ex, ex); |
| } |
| } |
| |
| public InetSocketAddress getClusterLoadBalanceAddress() { |
| try { |
| String host = getProperty(LOAD_BALANCE_HOST); |
| if (StringUtils.isBlank(host)) { |
| host = getProperty(CLUSTER_NODE_ADDRESS); |
| } |
| if (StringUtils.isBlank(host)) { |
| host = "localhost"; |
| } |
| |
| final int port = getIntegerProperty(LOAD_BALANCE_PORT, DEFAULT_LOAD_BALANCE_PORT); |
| return InetSocketAddress.createUnresolved(host, port); |
| } catch (final Exception e) { |
| throw new RuntimeException("Invalid load balance address/port due to: " + e, e); |
| } |
| } |
| |
| public Integer getClusterNodeProtocolPort() { |
| try { |
| return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_PORT)); |
| } catch (NumberFormatException nfe) { |
| return null; |
| } |
| } |
| |
| /** |
| * @deprecated Use getClusterNodeProtocolCorePoolSize() and getClusterNodeProtocolMaxPoolSize() instead |
| */ |
| @Deprecated() |
| public int getClusterNodeProtocolThreads() { |
| return getClusterNodeProtocolMaxPoolSize(); |
| } |
| |
| public int getClusterNodeProtocolMaxPoolSize() { |
| try { |
| return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_MAX_THREADS)); |
| } catch (NumberFormatException nfe) { |
| return DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS; |
| } |
| } |
| |
| public boolean isClustered() { |
| return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); |
| } |
| |
| public File getClusterNodeFirewallFile() { |
| final String firewallFile = getProperty(CLUSTER_FIREWALL_FILE); |
| if (StringUtils.isBlank(firewallFile)) { |
| return null; |
| } else { |
| return new File(firewallFile); |
| } |
| } |
| |
| public String getClusterProtocolManagerToNodeApiScheme() { |
| final String isSecureProperty = getProperty(CLUSTER_PROTOCOL_IS_SECURE); |
| if (Boolean.valueOf(isSecureProperty)) { |
| return "https"; |
| } else { |
| return "http"; |
| } |
| } |
| |
| public File getKerberosConfigurationFile() { |
| final String krb5File = getProperty(KERBEROS_KRB5_FILE); |
| if (krb5File != null && krb5File.trim().length() > 0) { |
| return new File(krb5File.trim()); |
| } else { |
| return null; |
| } |
| } |
| |
| public String getKerberosServicePrincipal() { |
| final String servicePrincipal = getProperty(KERBEROS_SERVICE_PRINCIPAL); |
| if (!StringUtils.isBlank(servicePrincipal)) { |
| return servicePrincipal.trim(); |
| } else { |
| return null; |
| } |
| } |
| |
| public String getKerberosServiceKeytabLocation() { |
| final String keytabLocation = getProperty(KERBEROS_SERVICE_KEYTAB_LOCATION); |
| if (!StringUtils.isBlank(keytabLocation)) { |
| return keytabLocation.trim(); |
| } else { |
| return null; |
| } |
| } |
| |
| public String getKerberosSpnegoPrincipal() { |
| final String spengoPrincipal = getProperty(KERBEROS_SPNEGO_PRINCIPAL); |
| if (!StringUtils.isBlank(spengoPrincipal)) { |
| return spengoPrincipal.trim(); |
| } else { |
| return null; |
| } |
| } |
| |
| public String getKerberosSpnegoKeytabLocation() { |
| final String keytabLocation = getProperty(KERBEROS_SPNEGO_KEYTAB_LOCATION); |
| if (!StringUtils.isBlank(keytabLocation)) { |
| return keytabLocation.trim(); |
| } else { |
| return null; |
| } |
| } |
| |
| public String getKerberosAuthenticationExpiration() { |
| final String authenticationExpirationString = getProperty(KERBEROS_AUTHENTICATION_EXPIRATION, DEFAULT_KERBEROS_AUTHENTICATION_EXPIRATION); |
| if (!StringUtils.isBlank(authenticationExpirationString)) { |
| return authenticationExpirationString.trim(); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Returns true if the Kerberos service principal and keytab location |
| * properties are populated. |
| * |
| * @return true if Kerberos service support is enabled |
| */ |
| public boolean isKerberosSpnegoSupportEnabled() { |
| return !StringUtils.isBlank(getKerberosSpnegoPrincipal()) && !StringUtils.isBlank(getKerberosSpnegoKeytabLocation()); |
| } |
| |
| /** |
| * Returns true if the login identity provider has been configured. |
| * |
| * @return true if the login identity provider has been configured |
| */ |
| public boolean isLoginIdentityProviderEnabled() { |
| return !StringUtils.isBlank(getProperty(NiFiProperties.SECURITY_USER_LOGIN_IDENTITY_PROVIDER)); |
| } |
| |
| /** |
| * @return True if property value is 'true'; False otherwise. |
| */ |
| public Boolean isAnonymousAuthenticationAllowed() { |
| final String anonymousAuthenticationAllowed = getProperty(SECURITY_ANONYMOUS_AUTHENTICATION, "false"); |
| |
| return "true".equalsIgnoreCase(anonymousAuthenticationAllowed); |
| } |
| |
| /** |
| * Returns whether an OpenId Connect (OIDC) URL is set. |
| * |
| * @return whether an OpenId Connect URL is set |
| */ |
| public boolean isOidcEnabled() { |
| return !StringUtils.isBlank(getOidcDiscoveryUrl()); |
| } |
| |
| /** |
| * Returns the OpenId Connect (OIDC) URL. Null otherwise. |
| * |
| * @return OIDC discovery url |
| */ |
| public String getOidcDiscoveryUrl() { |
| return getProperty(SECURITY_USER_OIDC_DISCOVERY_URL); |
| } |
| |
| /** |
| * Returns the OpenId Connect connect timeout. Non null. |
| * |
| * @return OIDC connect timeout |
| */ |
| public String getOidcConnectTimeout() { |
| return getProperty(SECURITY_USER_OIDC_CONNECT_TIMEOUT, DEFAULT_SECURITY_USER_OIDC_CONNECT_TIMEOUT); |
| } |
| |
| /** |
| * Returns the OpenId Connect read timeout. Non null. |
| * |
| * @return OIDC read timeout |
| */ |
| public String getOidcReadTimeout() { |
| return getProperty(SECURITY_USER_OIDC_READ_TIMEOUT, DEFAULT_SECURITY_USER_OIDC_READ_TIMEOUT); |
| } |
| |
| /** |
| * Returns the OpenId Connect client id. |
| * |
| * @return OIDC client id |
| */ |
| public String getOidcClientId() { |
| return getProperty(SECURITY_USER_OIDC_CLIENT_ID); |
| } |
| |
| /** |
| * Returns the OpenId Connect client secret. |
| * |
| * @return OIDC client secret |
| */ |
| public String getOidcClientSecret() { |
| return getProperty(SECURITY_USER_OIDC_CLIENT_SECRET); |
| } |
| |
| /** |
| * Returns the preferred json web signature algorithm. May be null/blank. |
| * |
| * @return OIDC preferred json web signature algorithm |
| */ |
| public String getOidcPreferredJwsAlgorithm() { |
| return getProperty(SECURITY_USER_OIDC_PREFERRED_JWSALGORITHM); |
| } |
| |
| /** |
| * Returns additional scopes to be sent when requesting the access token from the IDP. |
| * |
| * @return List of additional scopes to be sent |
| */ |
| public List<String> getOidcAdditionalScopes() { |
| String rawProperty = getProperty(SECURITY_USER_OIDC_ADDITIONAL_SCOPES, ""); |
| if (rawProperty.isEmpty()) { |
| return new ArrayList<>(); |
| } |
| List<String> additionalScopes = Arrays.asList(rawProperty.split(",")); |
| return additionalScopes.stream().map(String::trim).collect(Collectors.toList()); |
| } |
| |
| /** |
| * Returns the claim to be used to identify a user. |
| * Claim must be requested by adding the scope for it. |
| * Default is 'email'. |
| * |
| * @return The claim to be used to identify the user. |
| */ |
| public String getOidcClaimIdentifyingUser() { |
| return getProperty(SECURITY_USER_OIDC_CLAIM_IDENTIFYING_USER, "email").trim(); |
| } |
| |
| /** |
| * Returns the list of fallback claims to be used to identify a user when the configured claim is empty for a user |
| * |
| * @return The list of fallback claims to be used to identify the user |
| */ |
| public List<String> getOidcFallbackClaimsIdentifyingUser() { |
| String rawProperty = getProperty(SECURITY_USER_OIDC_FALLBACK_CLAIMS_IDENTIFYING_USER, "").trim(); |
| if (StringUtils.isBlank(rawProperty)) { |
| return Collections.emptyList(); |
| } else { |
| List<String> fallbackClaims = Arrays.asList(rawProperty.split(",")); |
| return fallbackClaims.stream().map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); |
| } |
| } |
| |
| public boolean shouldSendServerVersion() { |
| return Boolean.parseBoolean(getProperty(WEB_SHOULD_SEND_SERVER_VERSION, DEFAULT_WEB_SHOULD_SEND_SERVER_VERSION)); |
| } |
| |
| // Automatic diagnostic getters |
| |
| public boolean isDiagnosticsOnShutdownEnabled() { |
| return Boolean.parseBoolean(getProperty(DIAGNOSTICS_ON_SHUTDOWN_ENABLED)); |
| } |
| |
| public boolean isDiagnosticsOnShutdownVerbose() { |
| return Boolean.parseBoolean(getProperty(DIAGNOSTICS_ON_SHUTDOWN_VERBOSE)); |
| } |
| |
| public String getDiagnosticsOnShutdownDirectory() { |
| return getProperty(DIAGNOSTICS_ON_SHUTDOWN_DIRECTORY, DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_DIRECTORY); |
| } |
| |
| public int getDiagnosticsOnShutdownMaxFileCount() { |
| try { |
| return Integer.parseInt(getProperty(DIAGNOSTICS_ON_SHUTDOWN_MAX_FILE_COUNT)); |
| } catch (NumberFormatException e) { |
| return DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_FILE_COUNT; |
| } |
| } |
| |
| public String getDiagnosticsOnShutdownDirectoryMaxSize() { |
| return getProperty(DIAGNOSTICS_ON_SHUTDOWN_MAX_DIRECTORY_SIZE, DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_DIRECTORY_SIZE); |
| } |
| |
| /** |
| * Returns whether Knox SSO is enabled. |
| * |
| * @return whether Knox SSO is enabled |
| */ |
| public boolean isKnoxSsoEnabled() { |
| return !StringUtils.isBlank(getKnoxUrl()); |
| } |
| |
| /** |
| * Returns the Knox URL. |
| * |
| * @return Knox URL |
| */ |
| public String getKnoxUrl() { |
| return getProperty(SECURITY_USER_KNOX_URL); |
| } |
| |
| /** |
| * Gets the configured Knox Audiences. |
| * |
| * @return Knox audiences |
| */ |
| public Set<String> getKnoxAudiences() { |
| final String rawAudiences = getProperty(SECURITY_USER_KNOX_AUDIENCES); |
| if (StringUtils.isBlank(rawAudiences)) { |
| return null; |
| } else { |
| final String[] audienceTokens = rawAudiences.split(","); |
| return Stream.of(audienceTokens).map(String::trim).filter(aud -> !StringUtils.isEmpty(aud)).collect(Collectors.toSet()); |
| } |
| } |
| |
| /** |
| * Returns the path to the Knox public key. |
| * |
| * @return path to the Knox public key |
| */ |
| public Path getKnoxPublicKeyPath() { |
| return Paths.get(getProperty(SECURITY_USER_KNOX_PUBLIC_KEY)); |
| } |
| |
| /** |
| * Returns the name of the Knox cookie. |
| * |
| * @return name of the Knox cookie |
| */ |
| public String getKnoxCookieName() { |
| return getProperty(SECURITY_USER_KNOX_COOKIE_NAME); |
| } |
| |
| /** |
| * Returns whether SAML is enabled. |
| * |
| * @return whether saml is enabled |
| */ |
| public boolean isSamlEnabled() { |
| return !StringUtils.isBlank(getSamlIdentityProviderMetadataUrl()); |
| } |
| |
| /** |
| * The URL to obtain the identity provider metadata. |
| * Must be a value starting with 'file://' or 'http://'. |
| * |
| * @return the url to obtain the identity provider metadata |
| */ |
| public String getSamlIdentityProviderMetadataUrl() { |
| return getProperty(SECURITY_USER_SAML_IDP_METADATA_URL); |
| } |
| |
| /** |
| * The entity id for the service provider. |
| * |
| * @return the service provider entity id |
| */ |
| public String getSamlServiceProviderEntityId() { |
| return getProperty(SECURITY_USER_SAML_SP_ENTITY_ID); |
| } |
| |
| /** |
| * The name of an attribute in the SAML assertions that contains the user identity. |
| * <p> |
| * If not specified, or missing, the NameID of the Subject will be used. |
| * |
| * @return the attribute name containing the user identity |
| */ |
| public String getSamlIdentityAttributeName() { |
| return getProperty(SECURITY_USER_SAML_IDENTITY_ATTRIBUTE_NAME); |
| } |
| |
| /** |
| * The name of the attribute in the SAML assertions that contains the groups the user belongs to. |
| * |
| * @return the attribute name containing user groups |
| */ |
| public String getSamlGroupAttributeName() { |
| return getProperty(SECURITY_USER_SAML_GROUP_ATTRIBUTE_NAME); |
| } |
| |
| /** |
| * The signing algorithm to use for signing SAML requests. |
| * |
| * @return the signing algorithm to use |
| */ |
| public String getSamlSignatureAlgorithm() { |
| return getProperty(SECURITY_USER_SAML_SIGNATURE_ALGORITHM, DEFAULT_SECURITY_USER_SAML_SIGNATURE_ALGORITHM); |
| } |
| |
| /** |
| * The digest algorithm to use for signing SAML requests. |
| * |
| * @return the digest algorithm |
| */ |
| public String getSamlSignatureDigestAlgorithm() { |
| return getProperty(SECURITY_USER_SAML_SIGNATURE_DIGEST_ALGORITHM, DEFAULT_SECURITY_USER_SAML_DIGEST_ALGORITHM); |
| } |
| |
| /** |
| * Whether or not to sign the service provider metadata. |
| * |
| * @return whether or not to sign the service provider metadata |
| */ |
| public boolean isSamlMetadataSigningEnabled() { |
| return Boolean.parseBoolean(getProperty(SECURITY_USER_SAML_METADATA_SIGNING_ENABLED, DEFAULT_SECURITY_USER_SAML_METADATA_SIGNING_ENABLED)); |
| } |
| |
| /** |
| * Whether or not to sign requests sent to the identity provider. |
| * |
| * @return whether or not to sign requests sent to the identity provider |
| */ |
| public boolean isSamlRequestSigningEnabled() { |
| return Boolean.parseBoolean(getProperty(SECURITY_USER_SAML_REQUEST_SIGNING_ENABLED, DEFAULT_SECURITY_USER_SAML_REQUEST_SIGNING_ENABLED)); |
| } |
| |
| /** |
| * Whether or not the identity provider should sign assertions when sending response back. |
| * |
| * @return whether or not the identity provider should sign assertions when sending response back |
| */ |
| public boolean isSamlWantAssertionsSigned() { |
| return Boolean.parseBoolean(getProperty(SECURITY_USER_SAML_WANT_ASSERTIONS_SIGNED, DEFAULT_SECURITY_USER_SAML_WANT_ASSERTIONS_SIGNED)); |
| } |
| |
| /** |
| * Whether or not to log messages for debug purposes. |
| * |
| * @return whether or not to log messages |
| */ |
| public boolean isSamlMessageLoggingEnabled() { |
| return Boolean.parseBoolean(getProperty(SECURITY_USER_SAML_MESSAGE_LOGGING_ENABLED, DEFAULT_SECURITY_USER_SAML_MESSAGE_LOGGING_ENABLED)); |
| } |
| |
| /** |
| * The expiration value for a JWT created from a SAML authentication. |
| * |
| * @return the expiration value for a SAML authentication |
| */ |
| public String getSamlAuthenticationExpiration() { |
| return getProperty(SECURITY_USER_SAML_AUTHENTICATION_EXPIRATION, DEFAULT_SECURITY_USER_SAML_AUTHENTICATION_EXPIRATION); |
| } |
| |
| /** |
| * Whether or not logging out of NiFi should logout of the SAML IDP using the SAML SingleLogoutService. |
| * |
| * @return whether or not SAML single logout is enabled |
| */ |
| public boolean isSamlSingleLogoutEnabled() { |
| return Boolean.parseBoolean(getProperty(SECURITY_USER_SAML_SINGLE_LOGOUT_ENABLED, DEFAULT_SECURITY_USER_SAML_SINGLE_LOGOUT_ENABLED)); |
| } |
| |
| /** |
| * The truststore to use when interacting with a SAML IDP over https. Valid values are "JDK" and "NIFI". |
| * |
| * @return the type of truststore to use |
| */ |
| public String getSamlHttpClientTruststoreStrategy() { |
| return getProperty(SECURITY_USER_SAML_HTTP_CLIENT_TRUSTSTORE_STRATEGY, DEFAULT_SECURITY_USER_SAML_HTTP_CLIENT_TRUSTSTORE_STRATEGY); |
| } |
| |
| /** |
| * The connect timeout for the http client created for SAML operations. |
| * |
| * @return the connect timeout |
| */ |
| public String getSamlHttpClientConnectTimeout() { |
| return getProperty(SECURITY_USER_SAML_HTTP_CLIENT_CONNECT_TIMEOUT, DEFAULT_SECURITY_USER_SAML_HTTP_CLIENT_CONNECT_TIMEOUT); |
| } |
| |
| /** |
| * The read timeout for the http client created for SAML operations. |
| * |
| * @return the read timeout |
| */ |
| public String getSamlHttpClientReadTimeout() { |
| return getProperty(SECURITY_USER_SAML_HTTP_CLIENT_READ_TIMEOUT, DEFAULT_SECURITY_USER_SAML_HTTP_CLIENT_READ_TIMEOUT); |
| } |
| |
| /** |
| * Returns true if client certificates are required for REST API. Determined |
| * if the following conditions are all true: |
| * <p> |
| * - login identity provider is not populated |
| * - Kerberos service support is not enabled |
| * - openid connect is not enabled |
| * - knox sso is not enabled |
| * - anonymous authentication is not enabled |
| * </p> |
| * |
| * @return true if client certificates are required for access to the REST API |
| */ |
| public boolean isClientAuthRequiredForRestApi() { |
| return !isLoginIdentityProviderEnabled() |
| && !isKerberosSpnegoSupportEnabled() |
| && !isOidcEnabled() |
| && !isKnoxSsoEnabled() |
| && !isSamlEnabled() |
| && !isAnonymousAuthenticationAllowed(); |
| } |
| |
| public InetSocketAddress getNodeApiAddress() { |
| |
| final String rawScheme = getClusterProtocolManagerToNodeApiScheme(); |
| final String scheme = (rawScheme == null) ? "http" : rawScheme; |
| |
| final String host; |
| final Integer port; |
| if ("http".equalsIgnoreCase(scheme)) { |
| // get host |
| if (StringUtils.isBlank(getProperty(WEB_HTTP_HOST))) { |
| host = "localhost"; |
| } else { |
| host = getProperty(WEB_HTTP_HOST); |
| } |
| // get port |
| port = getPort(); |
| |
| if (port == null) { |
| throw new RuntimeException(String.format("The %s must be specified if running in a cluster with %s set to false.", WEB_HTTP_PORT, CLUSTER_PROTOCOL_IS_SECURE)); |
| } |
| } else { |
| // get host |
| if (StringUtils.isBlank(getProperty(WEB_HTTPS_HOST))) { |
| host = "localhost"; |
| } else { |
| host = getProperty(WEB_HTTPS_HOST); |
| } |
| // get port |
| port = getSslPort(); |
| |
| if (port == null) { |
| throw new RuntimeException(String.format("The %s must be specified if running in a cluster with %s set to true.", WEB_HTTPS_PORT, CLUSTER_PROTOCOL_IS_SECURE)); |
| } |
| } |
| |
| return InetSocketAddress.createUnresolved(host, port); |
| |
| } |
| |
| /** |
| * Returns the database repository path. It simply returns the value |
| * configured. No directories will be created as a result of this operation. |
| * |
| * @return database repository path |
| * @throws InvalidPathException If the configured path is invalid |
| */ |
| public Path getDatabaseRepositoryPath() { |
| return Paths.get(getProperty(REPOSITORY_DATABASE_DIRECTORY)); |
| } |
| |
| /** |
| * Returns the flow file repository path. It simply returns the value |
| * configured. No directories will be created as a result of this operation. |
| * |
| * @return database repository path |
| * @throws InvalidPathException If the configured path is invalid |
| */ |
| public Path getFlowFileRepositoryPath() { |
| return Paths.get(getProperty(FLOWFILE_REPOSITORY_DIRECTORY)); |
| } |
| |
| /** |
| * Returns the content repository paths. This method returns a mapping of |
| * file repository name to file repository paths. It simply returns the |
| * values configured. No directories will be created as a result of this |
| * operation. |
| * |
| * @return file repositories paths |
| * @throws InvalidPathException If any of the configured paths are invalid |
| */ |
| public Map<String, Path> getContentRepositoryPaths() { |
| final Map<String, Path> contentRepositoryPaths = new HashMap<>(); |
| |
| // go through each property |
| for (String propertyName : getPropertyKeys()) { |
| // determine if the property is a file repository path |
| if (StringUtils.startsWith(propertyName, REPOSITORY_CONTENT_PREFIX)) { |
| // get the repository key |
| final String key = StringUtils.substringAfter(propertyName, |
| REPOSITORY_CONTENT_PREFIX); |
| |
| // attempt to resolve the path specified |
| contentRepositoryPaths.put(key, Paths.get(getProperty(propertyName))); |
| } |
| } |
| return contentRepositoryPaths; |
| } |
| |
| /** |
| * Returns the provenance repository paths. This method returns a mapping of |
| * file repository name to file repository paths. It simply returns the |
| * values configured. No directories will be created as a result of this |
| * operation. |
| * |
| * @return the name and paths of all provenance repository locations |
| */ |
| public Map<String, Path> getProvenanceRepositoryPaths() { |
| final Map<String, Path> provenanceRepositoryPaths = new HashMap<>(); |
| |
| // go through each property |
| for (String propertyName : getPropertyKeys()) { |
| // determine if the property is a file repository path |
| if (StringUtils.startsWith(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX)) { |
| // get the repository key |
| final String key = StringUtils.substringAfter(propertyName, |
| PROVENANCE_REPO_DIRECTORY_PREFIX); |
| |
| // attempt to resolve the path specified |
| provenanceRepositoryPaths.put(key, Paths.get(getProperty(propertyName))); |
| } |
| } |
| return provenanceRepositoryPaths; |
| } |
| |
| /** |
| * Returns the number of claims to keep open for writing. Ideally, this will be at |
| * least as large as the number of threads that will be updating the repository simultaneously but we don't want |
| * to get too large because it will hold open up to this many FileOutputStreams. |
| * <p> |
| * Default is {@link #DEFAULT_MAX_FLOWFILES_PER_CLAIM} |
| * |
| * @return the maximum number of flow files per claim |
| */ |
| public int getMaxFlowFilesPerClaim() { |
| try { |
| return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM)); |
| } catch (NumberFormatException nfe) { |
| return DEFAULT_MAX_FLOWFILES_PER_CLAIM; |
| } |
| } |
| |
| /** |
| * Returns the maximum size, in bytes, that claims should grow before writing a new file. This means that we won't continually write to one |
| * file that keeps growing but gives us a chance to bunch together many small files. |
| * <p> |
| * Default is {@link #DEFAULT_MAX_APPENDABLE_CLAIM_SIZE} |
| * |
| * @return the maximum appendable claim size |
| */ |
| public String getMaxAppendableClaimSize() { |
| return getProperty(MAX_APPENDABLE_CLAIM_SIZE, DEFAULT_MAX_APPENDABLE_CLAIM_SIZE); |
| } |
| |
| public String getProperty(final String key, final String defaultValue) { |
| final String value = getProperty(key); |
| return (value == null || value.trim().isEmpty()) ? defaultValue : value; |
| } |
| |
| public String getBoredYieldDuration() { |
| return getProperty(BORED_YIELD_DURATION, DEFAULT_BORED_YIELD_DURATION); |
| } |
| |
| public File getStateManagementConfigFile() { |
| return new File(getProperty(STATE_MANAGEMENT_CONFIG_FILE, DEFAULT_STATE_MANAGEMENT_CONFIG_FILE)); |
| } |
| |
| public String getLocalStateProviderId() { |
| return getProperty(STATE_MANAGEMENT_LOCAL_PROVIDER_ID); |
| } |
| |
| public String getClusterStateProviderId() { |
| return getProperty(STATE_MANAGEMENT_CLUSTER_PROVIDER_ID); |
| } |
| |
| public File getEmbeddedZooKeeperPropertiesFile() { |
| final String filename = getProperty(STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES); |
| return filename == null ? null : new File(filename); |
| } |
| |
| public boolean isStartEmbeddedZooKeeper() { |
| return Boolean.parseBoolean(getProperty(STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER)); |
| } |
| |
| public boolean isFlowConfigurationArchiveEnabled() { |
| return Boolean.parseBoolean(getProperty(FLOW_CONFIGURATION_ARCHIVE_ENABLED, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_ENABLED)); |
| } |
| |
| public String getFlowConfigurationArchiveDir() { |
| return getProperty(FLOW_CONFIGURATION_ARCHIVE_DIR); |
| } |
| |
| public String getFlowElectionMaxWaitTime() { |
| return getProperty(FLOW_ELECTION_MAX_WAIT_TIME, DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME); |
| } |
| |
| public Integer getFlowElectionMaxCandidates() { |
| return getIntegerProperty(FLOW_ELECTION_MAX_CANDIDATES, null); |
| } |
| |
| public String getFlowConfigurationArchiveMaxTime() { |
| return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_TIME, null); |
| } |
| |
| public String getFlowConfigurationArchiveMaxStorage() { |
| return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE, null); |
| } |
| |
| public Integer getFlowConfigurationArchiveMaxCount() { |
| return getIntegerProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_COUNT, null); |
| } |
| |
| public String getVariableRegistryProperties() { |
| return getProperty(VARIABLE_REGISTRY_PROPERTIES); |
| } |
| |
| public Path[] getVariableRegistryPropertiesPaths() { |
| final List<Path> vrPropertiesPaths = new ArrayList<>(); |
| |
| final String vrPropertiesFiles = getVariableRegistryProperties(); |
| if (!StringUtils.isEmpty(vrPropertiesFiles)) { |
| |
| final String[] vrPropertiesFileList = vrPropertiesFiles.split(","); |
| |
| for (String propertiesFile : vrPropertiesFileList) { |
| vrPropertiesPaths.add(Paths.get(propertiesFile)); |
| } |
| |
| return vrPropertiesPaths.toArray(new Path[vrPropertiesPaths.size()]); |
| } else { |
| return new Path[]{}; |
| } |
| } |
| |
| /** |
| * Returns the network interface list to use for HTTP. This method returns a mapping of |
| * network interface property names to network interface names. |
| * |
| * @return the property name and network interface name of all HTTP network interfaces |
| */ |
| public Map<String, String> getHttpNetworkInterfaces() { |
| final Map<String, String> networkInterfaces = new HashMap<>(); |
| |
| // go through each property |
| for (String propertyName : getPropertyKeys()) { |
| // determine if the property is a network interface name |
| if (StringUtils.startsWith(propertyName, WEB_HTTP_NETWORK_INTERFACE_PREFIX)) { |
| // get the network interface property key |
| final String key = StringUtils.substringAfter(propertyName, |
| WEB_HTTP_NETWORK_INTERFACE_PREFIX); |
| networkInterfaces.put(key, getProperty(propertyName)); |
| } |
| } |
| return networkInterfaces; |
| } |
| |
| /** |
| * Returns the network interface list to use for HTTPS. This method returns a mapping of |
| * network interface property names to network interface names. |
| * |
| * @return the property name and network interface name of all HTTPS network interfaces |
| */ |
| public Map<String, String> getHttpsNetworkInterfaces() { |
| final Map<String, String> networkInterfaces = new HashMap<>(); |
| |
| // go through each property |
| for (String propertyName : getPropertyKeys()) { |
| // determine if the property is a network interface name |
| if (StringUtils.startsWith(propertyName, WEB_HTTPS_NETWORK_INTERFACE_PREFIX)) { |
| // get the network interface property key |
| final String key = StringUtils.substringAfter(propertyName, |
| WEB_HTTPS_NETWORK_INTERFACE_PREFIX); |
| networkInterfaces.put(key, getProperty(propertyName)); |
| } |
| } |
| return networkInterfaces; |
| } |
| |
| /** |
| * @return True if property value is 'true'; False if property value is 'false'. Throws an exception otherwise. |
| */ |
| public boolean isZooKeeperClientSecure() { |
| final String defaultValue = String.valueOf(DEFAULT_ZOOKEEPER_CLIENT_SECURE); |
| final String clientSecure = getProperty(ZOOKEEPER_CLIENT_SECURE, defaultValue).trim(); |
| |
| if (!"true".equalsIgnoreCase(clientSecure) && !"false".equalsIgnoreCase(clientSecure)) { |
| throw new RuntimeException(String.format("%s was '%s', expected true or false", NiFiProperties.ZOOKEEPER_CLIENT_SECURE, clientSecure)); |
| } |
| |
| return Boolean.parseBoolean(clientSecure); |
| } |
| |
| public boolean isZooKeeperTlsConfigurationPresent() { |
| return StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE)) |
| && StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE)) |
| && getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD) != null |
| && StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE)) |
| && getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD) != null; |
| } |
| |
| public boolean isTlsConfigurationPresent() { |
| return StringUtils.isNotBlank(getProperty(SECURITY_KEYSTORE)) |
| && getProperty(SECURITY_KEYSTORE_PASSWD) != null |
| && StringUtils.isNotBlank(getProperty(SECURITY_TRUSTSTORE)) |
| && getProperty(SECURITY_TRUSTSTORE_PASSWD) != null; |
| } |
| |
| public String getFlowFileRepoEncryptionKeyId() { |
| return getProperty(FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID, getProperty(REPOSITORY_ENCRYPTION_KEY_ID)); |
| } |
| |
| /** |
| * Returns the active flowfile repository encryption key if a {@code StaticKeyProvider} is in use. |
| * If no key ID is specified in the properties file, the default |
| * {@code nifi.flowfile.repository.encryption.key} value is returned. If a key ID is specified in |
| * {@code nifi.flowfile.repository.encryption.key.id}, it will attempt to read from |
| * {@code nifi.flowfile.repository.encryption.key.id.XYZ} where {@code XYZ} is the provided key |
| * ID. If that value is empty, it will use the default property |
| * {@code nifi.flowfile.repository.encryption.key}. |
| * |
| * @return the flowfile repository encryption key in hex form |
| */ |
| public String getFlowFileRepoEncryptionKey() { |
| String keyId = getFlowFileRepoEncryptionKeyId(); |
| String keyKey = StringUtils.isBlank(keyId) ? FLOWFILE_REPOSITORY_ENCRYPTION_KEY : FLOWFILE_REPOSITORY_ENCRYPTION_KEY + ".id." + keyId; |
| return getProperty(keyKey, getProperty(FLOWFILE_REPOSITORY_ENCRYPTION_KEY)); |
| } |
| |
| /** |
| * Returns a map of keyId -> key in hex loaded from the {@code nifi.properties} file if a |
| * {@code StaticKeyProvider} is defined. If {@code FileBasedKeyProvider} is defined this method will return an empty map. |
| * |
| * @return a Map of the keys identified by key ID |
| */ |
| public Map<String, String> getFlowFileRepoEncryptionKeys() { |
| return getRepositoryEncryptionKeys("flowfile"); |
| } |
| |
| /** |
| * Returns the map of key IDs to keys retrieved from the properties for the given repository type. |
| * |
| * @param repositoryType "provenance", "content", or "flowfile" |
| * @return the key map |
| */ |
| public Map<String, String> getRepositoryEncryptionKeys(final String repositoryType) { |
| Objects.requireNonNull(repositoryType, "Repository Type required"); |
| final Map<String, String> keys = new HashMap<>(); |
| final List<String> keyProperties = getRepositoryEncryptionKeyProperties(repositoryType); |
| if (keyProperties.size() == 0) { |
| logger.warn("Repository [{}] Encryption Key properties not found", repositoryType); |
| return keys; |
| } |
| final String repositoryEncryptionKey = getRepositoryEncryptionKey(repositoryType); |
| final String repositoryEncryptionKeyId = getRepositoryEncryptionKeyId(repositoryType); |
| |
| // Retrieve the actual key values and store non-empty values in the map |
| for (final String keyProperty : keyProperties) { |
| final String keyValue = getProperty(keyProperty); |
| if (StringUtils.isNotBlank(keyValue)) { |
| // If this property is .key (the actual hex key), put it in the map under the value of .key.id (i.e. key1) |
| if (keyProperty.equalsIgnoreCase(repositoryEncryptionKey)) { |
| keys.put(getProperty(repositoryEncryptionKeyId), keyValue); |
| } else { |
| // Extract nifi.*.repository.encryption.key.id.key1 -> key1 |
| final String extractedKeyId = keyProperty.substring(keyProperty.lastIndexOf(".") + 1); |
| if (keys.containsKey(extractedKeyId)) { |
| logger.warn("Repository [{}] Duplicate Encryption Key ID [{}]: Ignoring Property [{}]", repositoryType, extractedKeyId, keyProperty); |
| } else { |
| keys.put(extractedKeyId, keyValue); |
| } |
| } |
| } |
| } |
| return keys; |
| } |
| |
| /** |
| * Returns the list of encryption key properties for the specified repository type. If an unknown repository type |
| * is provided, returns an empty list. |
| * |
| * @param repositoryType "provenance", "content", or "flowfile" |
| * @return the list of encryption key properties |
| */ |
| private List<String> getRepositoryEncryptionKeyProperties(String repositoryType) { |
| switch (repositoryType.toLowerCase()) { |
| case "flowfile": |
| return getFlowFileRepositoryEncryptionKeyProperties(); |
| case "content": |
| return getContentRepositoryEncryptionKeyProperties(); |
| case "provenance": |
| return getProvenanceRepositoryEncryptionKeyProperties(); |
| default: |
| return Collections.emptyList(); |
| } |
| } |
| |
| /** |
| * Returns the encryption key property key for the specified repository type. If an unknown repository type |
| * is provided, returns an empty string. |
| * |
| * @param repositoryType "provenance", "content", or "flowfile" |
| * @return the encryption key property (i.e. {@code FLOWFILE_REPOSITORY_ENCRYPTION_KEY}) |
| */ |
| private String getRepositoryEncryptionKey(String repositoryType) { |
| switch (repositoryType.toLowerCase()) { |
| case "flowfile": |
| return FLOWFILE_REPOSITORY_ENCRYPTION_KEY; |
| case "content": |
| return CONTENT_REPOSITORY_ENCRYPTION_KEY; |
| case "provenance": |
| return PROVENANCE_REPO_ENCRYPTION_KEY; |
| default: |
| return ""; |
| } |
| } |
| |
| /** |
| * Returns the encryption key ID property key for the specified repository type. If an unknown repository type |
| * is provided, returns an empty string. |
| * |
| * @param repositoryType "provenance", "content", or "flowfile" |
| * @return the encryption key ID property (i.e. {@code FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID}) |
| */ |
| private String getRepositoryEncryptionKeyId(String repositoryType) { |
| switch (repositoryType.toLowerCase()) { |
| case "flowfile": |
| return FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID; |
| case "content": |
| return CONTENT_REPOSITORY_ENCRYPTION_KEY_ID; |
| case "provenance": |
| return PROVENANCE_REPO_ENCRYPTION_KEY_ID; |
| default: |
| return ""; |
| } |
| } |
| |
| public String getProvenanceRepoEncryptionKeyId() { |
| return getProperty(PROVENANCE_REPO_ENCRYPTION_KEY_ID, getProperty(REPOSITORY_ENCRYPTION_KEY_ID)); |
| } |
| |
| /** |
| * Returns the active provenance repository encryption key if a {@code StaticKeyProvider} is in use. |
| * If no key ID is specified in the properties file, the default |
| * {@code nifi.provenance.repository.encryption.key} value is returned. If a key ID is specified in |
| * {@code nifi.provenance.repository.encryption.key.id}, it will attempt to read from |
| * {@code nifi.provenance.repository.encryption.key.id.XYZ} where {@code XYZ} is the provided key |
| * ID. If that value is empty, it will use the default property |
| * {@code nifi.provenance.repository.encryption.key}. |
| * |
| * @return the provenance repository encryption key in hex form |
| */ |
| public String getProvenanceRepoEncryptionKey() { |
| String keyId = getProvenanceRepoEncryptionKeyId(); |
| String keyKey = StringUtils.isBlank(keyId) ? PROVENANCE_REPO_ENCRYPTION_KEY : PROVENANCE_REPO_ENCRYPTION_KEY + ".id." + keyId; |
| return getProperty(keyKey, getProperty(PROVENANCE_REPO_ENCRYPTION_KEY)); |
| } |
| |
| /** |
| * Returns a map of keyId -> key in hex loaded from the {@code nifi.properties} file if a |
| * {@code StaticKeyProvider} is defined. If {@code FileBasedKeyProvider} is defined this method will return an empty map. |
| * |
| * @return a Map of the keys identified by key ID |
| */ |
| public Map<String, String> getProvenanceRepoEncryptionKeys() { |
| return getRepositoryEncryptionKeys("provenance"); |
| } |
| |
| public String getContentRepositoryEncryptionKeyId() { |
| return getProperty(CONTENT_REPOSITORY_ENCRYPTION_KEY_ID, getProperty(REPOSITORY_ENCRYPTION_KEY_ID)); |
| } |
| |
| /** |
| * Returns the active content repository encryption key if a {@code StaticKeyProvider} is in use. |
| * If no key ID is specified in the properties file, the default |
| * {@code nifi.content.repository.encryption.key} value is returned. If a key ID is specified in |
| * {@code nifi.content.repository.encryption.key.id}, it will attempt to read from |
| * {@code nifi.content.repository.encryption.key.id.XYZ} where {@code XYZ} is the provided key |
| * ID. If that value is empty, it will use the default property |
| * {@code nifi.content.repository.encryption.key}. |
| * |
| * @return the content repository encryption key in hex form |
| */ |
| public String getContentRepositoryEncryptionKey() { |
| String keyId = getContentRepositoryEncryptionKeyId(); |
| String keyKey = StringUtils.isBlank(keyId) ? CONTENT_REPOSITORY_ENCRYPTION_KEY : CONTENT_REPOSITORY_ENCRYPTION_KEY + ".id." + keyId; |
| return getProperty(keyKey, getProperty(CONTENT_REPOSITORY_ENCRYPTION_KEY)); |
| } |
| |
| /** |
| * Returns a map of keyId -> key in hex loaded from the {@code nifi.properties} file if a |
| * {@code StaticKeyProvider} is defined. If {@code FileBasedKeyProvider} is defined this method will return an empty map. |
| * |
| * @return a Map of the keys identified by key ID |
| */ |
| public Map<String, String> getContentRepositoryEncryptionKeys() { |
| return getRepositoryEncryptionKeys("content"); |
| } |
| |
| /** |
| * Returns the allowed proxy hostnames (and IP addresses) as a comma-delimited string. |
| * The hosts have been normalized to the form {@code somehost.com}, {@code somehost.com:port}, or {@code 127.0.0.1}. |
| * <p> |
| * Note: Calling {@code NiFiProperties.getProperty(NiFiProperties.WEB_PROXY_HOST)} will not normalize the hosts. |
| * |
| * @return the hostname(s) |
| */ |
| public String getAllowedHosts() { |
| return StringUtils.join(getAllowedHostsAsList(), ","); |
| } |
| |
| /** |
| * Returns the allowed proxy hostnames (and IP addresses) as a List. The hosts have been normalized to the form {@code somehost.com}, {@code somehost.com:port}, or {@code 127.0.0.1}. |
| * |
| * @return the hostname(s) |
| */ |
| public List<String> getAllowedHostsAsList() { |
| String rawProperty = getProperty(WEB_PROXY_HOST, ""); |
| List<String> hosts = Arrays.asList(rawProperty.split(",")); |
| return hosts.stream() |
| .map(this::normalizeHost).filter(host -> !StringUtils.isBlank(host)).collect(Collectors.toList()); |
| } |
| |
| String normalizeHost(String host) { |
| if (host == null || host.equalsIgnoreCase("")) { |
| return ""; |
| } else { |
| return host.trim(); |
| } |
| } |
| |
| /** |
| * Returns the allowed proxy context paths as a comma-delimited string. The paths have been normalized to the form {@code /some/context/path}. |
| * <p> |
| * Note: Calling {@code NiFiProperties.getProperty(NiFiProperties.WEB_PROXY_CONTEXT_PATH)} will not normalize the paths. |
| * |
| * @return the path(s) |
| */ |
| public String getAllowedContextPaths() { |
| return StringUtils.join(getAllowedContextPathsAsList(), ","); |
| } |
| |
| /** |
| * Returns the allowed proxy context paths as a list of paths. The paths have been normalized to the form {@code /some/context/path}. |
| * |
| * @return the path(s) |
| */ |
| public List<String> getAllowedContextPathsAsList() { |
| String rawProperty = getProperty(WEB_PROXY_CONTEXT_PATH, ""); |
| List<String> contextPaths = Arrays.asList(rawProperty.split(",")); |
| return contextPaths.stream() |
| .map(this::normalizeContextPath).collect(Collectors.toList()); |
| } |
| |
| private String normalizeContextPath(String cp) { |
| if (cp == null || cp.equalsIgnoreCase("")) { |
| return ""; |
| } else { |
| String trimmedCP = cp.trim(); |
| // Ensure it starts with a leading slash and does not end in a trailing slash |
| // There's a potential for the path to be something like bad/path/// but this is semi-trusted data from an admin-accessible file and there are way worse possibilities here |
| trimmedCP = trimmedCP.startsWith("/") ? trimmedCP : "/" + trimmedCP; |
| trimmedCP = trimmedCP.endsWith("/") ? trimmedCP.substring(0, trimmedCP.length() - 1) : trimmedCP; |
| return trimmedCP; |
| } |
| } |
| |
| private List<String> getFlowFileRepositoryEncryptionKeyProperties() { |
| // Filter all the property keys that define a key |
| return getPropertyKeys().stream().filter(k -> |
| k.startsWith(FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID + ".") || k.equalsIgnoreCase(FLOWFILE_REPOSITORY_ENCRYPTION_KEY) |
| ).collect(Collectors.toList()); |
| } |
| |
| private List<String> getProvenanceRepositoryEncryptionKeyProperties() { |
| // Filter all the property keys that define a key |
| return getPropertyKeys().stream().filter(k -> |
| k.startsWith(PROVENANCE_REPO_ENCRYPTION_KEY_ID + ".") || k.equalsIgnoreCase(PROVENANCE_REPO_ENCRYPTION_KEY) |
| ).collect(Collectors.toList()); |
| } |
| |
| private List<String> getContentRepositoryEncryptionKeyProperties() { |
| // Filter all the property keys that define a key |
| return getPropertyKeys().stream().filter(k -> |
| k.startsWith(CONTENT_REPOSITORY_ENCRYPTION_KEY_ID + ".") || k.equalsIgnoreCase(CONTENT_REPOSITORY_ENCRYPTION_KEY) |
| ).collect(Collectors.toList()); |
| } |
| |
| public Long getDefaultBackPressureObjectThreshold() { |
| long backPressureCount; |
| try { |
| String backPressureCountStr = getProperty(BACKPRESSURE_COUNT); |
| if (backPressureCountStr == null || backPressureCountStr.trim().isEmpty()) { |
| backPressureCount = DEFAULT_BACKPRESSURE_COUNT; |
| } else { |
| backPressureCount = Long.parseLong(backPressureCountStr); |
| } |
| } catch (NumberFormatException nfe) { |
| backPressureCount = DEFAULT_BACKPRESSURE_COUNT; |
| } |
| return backPressureCount; |
| } |
| |
| public String getDefaultBackPressureDataSizeThreshold() { |
| return getProperty(BACKPRESSURE_SIZE, DEFAULT_BACKPRESSURE_SIZE); |
| } |
| |
| /** |
| * Returns the directory where the QuestDB based status repository is expected to work within. |
| * |
| * @return Path object pointing to the database's folder. |
| */ |
| public Path getQuestDbStatusRepositoryPath() { |
| return Paths.get(getProperty(STATUS_REPOSITORY_QUESTDB_PERSIST_LOCATION, DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_LOCATION)); |
| } |
| |
| /** |
| * Returns all properties where the property key starts with the prefix. |
| * |
| * @param prefix The exact string the returned properties should start with. Dots are considered, thus prefix "item" will return both |
| * properties starting with "item." and "items". Properties with empty value will be included as well. |
| * @return A map of properties starting with the prefix. |
| */ |
| public Map<String, String> getPropertiesWithPrefix(final String prefix) { |
| return getPropertyKeys().stream().filter(key -> key.startsWith(prefix)).collect(Collectors.toMap(key -> key, key -> getProperty(key))); |
| } |
| |
| /** |
| * Returns with all the possible next "tokens" after the given prefix. An alphanumeric string between dots is considered as a "token". |
| * <p> |
| * For example if there are "parent.sub1" and a "parent.sub2" properties are set, and the prefix is "parent", the method will return |
| * with a set, consisting of "sub1" and "sub2. Only directly subsequent tokens are considered, so in case of "parent.sub1.subsub1", the |
| * result will contain "sub1" as well. |
| * |
| * @param prefix The prefix of the request. |
| * @return A set of direct subsequent tokens. |
| */ |
| public Set<String> getDirectSubsequentTokens(final String prefix) { |
| final String fixedPrefix = prefix.endsWith(".") ? prefix : prefix + "."; |
| |
| return getPropertyKeys().stream() |
| .filter(key -> key.startsWith(fixedPrefix)) |
| .map(key -> key.substring(fixedPrefix.length())) |
| .map(key -> key.indexOf('.') == -1 ? key : key.substring(0, key.indexOf('.'))) |
| .collect(Collectors.toSet()); |
| } |
| |
| /** |
| * Creates an instance of NiFiProperties. This should likely not be called |
| * by any classes outside of the NiFi framework but can be useful by the |
| * framework for default property loading behavior or helpful in tests |
| * needing to create specific instances of NiFiProperties. If properties |
| * file specified cannot be found/read a runtime exception will be thrown. |
| * If one is not specified an empty object will be returned. |
| * |
| * @param propertiesFilePath if provided properties will be loaded from |
| * given file; else will be loaded from System property. |
| * Can be null. Passing {@code ""} skips any attempt to load from the file system. |
| * @return NiFiProperties |
| */ |
| public static NiFiProperties createBasicNiFiProperties(final String propertiesFilePath) { |
| return createBasicNiFiProperties(propertiesFilePath, new Properties()); |
| } |
| |
| /** |
| * Creates an instance of NiFiProperties. This should likely not be called |
| * by any classes outside of the NiFi framework but can be useful by the |
| * framework for default property loading behavior or helpful in tests |
| * needing to create specific instances of NiFiProperties. If properties |
| * file specified cannot be found/read a runtime exception will be thrown. |
| * If one is not specified, only the provided properties will be returned. |
| * |
| * @param propertiesFilePath if provided properties will be loaded from |
| * given file; else will be loaded from System property. |
| * Can be null. Passing {@code ""} skips any attempt to load from the file system. |
| * @param additionalProperties allows overriding of properties with the |
| * supplied values. these will be applied after loading from any properties |
| * file. Can be null or empty. |
| * @return NiFiProperties |
| */ |
| public static NiFiProperties createBasicNiFiProperties(final String propertiesFilePath, final Map<String, String> additionalProperties) { |
| final Map<String, String> addProps = (additionalProperties == null) ? Collections.EMPTY_MAP : additionalProperties; |
| final Properties properties = new Properties(); |
| addProps.forEach(properties::put); |
| |
| return createBasicNiFiProperties(propertiesFilePath, properties); |
| } |
| |
| /** |
| * Creates an instance of NiFiProperties. This should likely not be called |
| * by any classes outside of the NiFi framework but can be useful by the |
| * framework for default property loading behavior or helpful in tests |
| * needing to create specific instances of NiFiProperties. If properties |
| * file specified cannot be found/read a runtime exception will be thrown. |
| * If one is not specified, only the provided properties will be returned. |
| * |
| * @param propertiesFilePath if provided properties will be loaded from |
| * given file; else will be loaded from System property. |
| * Can be null. Passing {@code ""} skips any attempt to load from the file system. |
| * @param additionalProperties allows overriding of properties with the |
| * supplied values. these will be applied after loading from any properties |
| * file. Can be null or empty. |
| * @return NiFiProperties |
| */ |
| public static NiFiProperties createBasicNiFiProperties(final String propertiesFilePath, final Properties additionalProperties) { |
| final Properties properties = new Properties(); |
| |
| // If the provided file path is null or provided, load from file. If it is "", skip this |
| if (propertiesFilePath == null || StringUtils.isNotBlank(propertiesFilePath)) { |
| readFromPropertiesFile(propertiesFilePath, properties); |
| } |
| |
| // The Properties(Properties) constructor does NOT inherit the provided values, just uses them as default values |
| if (additionalProperties != null) { |
| additionalProperties.forEach(properties::put); |
| } |
| return new NiFiProperties() { |
| @Override |
| public String getProperty(String key) { |
| return properties.getProperty(key); |
| } |
| |
| @Override |
| public Set<String> getPropertyKeys() { |
| return properties.stringPropertyNames(); |
| } |
| |
| @Override |
| public int size() { |
| return getPropertyKeys().size(); |
| } |
| }; |
| } |
| |
| private static void readFromPropertiesFile(String propertiesFilePath, Properties properties) { |
| final String nfPropertiesFilePath = (propertiesFilePath == null) |
| ? System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH) |
| : propertiesFilePath; |
| if (nfPropertiesFilePath != null) { |
| final File propertiesFile = new File(nfPropertiesFilePath.trim()); |
| if (!propertiesFile.exists()) { |
| throw new RuntimeException("Properties file doesn't exist '" |
| + propertiesFile.getAbsolutePath() + "'"); |
| } |
| if (!propertiesFile.canRead()) { |
| throw new RuntimeException("Properties file exists but cannot be read '" |
| + propertiesFile.getAbsolutePath() + "'"); |
| } |
| InputStream inStream = null; |
| try { |
| inStream = new BufferedInputStream(new FileInputStream(propertiesFile)); |
| properties.load(inStream); |
| } catch (final Exception ex) { |
| throw new RuntimeException("Cannot load properties file due to " |
| + ex.getLocalizedMessage(), ex); |
| } finally { |
| if (null != inStream) { |
| try { |
| inStream.close(); |
| } catch (final Exception ex) { |
| /** |
| * do nothing * |
| */ |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * This method is used to validate the NiFi properties when the file is loaded |
| * for the first time. The objective is to stop NiFi startup in case a property |
| * is not correctly configured and could cause issues afterwards. |
| */ |
| public void validate() { |
| // REMOTE_INPUT_HOST should be a valid hostname |
| String remoteInputHost = getProperty(REMOTE_INPUT_HOST); |
| if (!StringUtils.isBlank(remoteInputHost) && remoteInputHost.split(":").length > 1) { // no scheme/port needed here (http://) |
| throw new IllegalArgumentException(remoteInputHost + " is not a correct value for " + REMOTE_INPUT_HOST + ". It should be a valid hostname without protocol or port."); |
| } |
| // Other properties to validate... |
| } |
| |
| @Override |
| public String toString() { |
| return "NiFiProperties instance with " + size() + " properties"; |
| } |
| } |