blob: 28076f4edfb105b8211d7dee395e2b063d96f515 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsBoolean;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsImplementationOfClass;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsInteger;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsListEntryCustom;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsMapEntryCustom;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsMapEntryType;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsNoDuplicateInList;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsNumber;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsPositiveNumber;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsString;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsStringList;
import static org.apache.storm.validation.ConfigValidationAnnotations.IsStringOrStringList;
import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
import static org.apache.storm.validation.ConfigValidationAnnotations.Password;
import java.util.ArrayList;
import java.util.Map;
import org.apache.storm.container.ResourceIsolationInterface;
import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
import org.apache.storm.scheduler.blacklist.reporters.IReporter;
import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.security.auth.IHttpCredentialsPlugin;
import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.validation.Validated;
/**
* Storm configs are specified as a plain old map. This class provides constants for all the configurations possible on a Storm cluster.
* Each constant is paired with an annotation that defines the validity criterion of the corresponding field. Default values for these
* configs can be found in defaults.yaml.
*
* <p>This class extends {@link org.apache.storm.Config} for supporting Storm Daemons.
*/
public class DaemonConfig implements Validated {
/**
* We check with this interval that whether the Netty channel is writable and try to write pending messages.
*/
@IsInteger
public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
/**
* A list of daemon metrics reporter plugin class names. These plugins must implement {@link
* org.apache.storm.daemon.metrics.reporters.PreparableReporter} interface.
*/
@IsStringList
public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins";
/**
* A specify domain for daemon metrics reporter plugin to limit reporting to specific domain.
*/
@IsString
public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN = "storm.daemon.metrics.reporter.plugin.domain";
/**
* A specify csv reporter directory for CvsPreparableReporter daemon metrics reporter.
*/
@IsString
public static final String STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR = "storm.daemon.metrics.reporter.csv.log.dir";
/**
* A directory that holds configuration files for log4j2. It can be either a relative or an absolute directory. If relative, it is
* relative to the storm's home directory.
*/
@IsString
public static final String STORM_LOG4J2_CONF_DIR = "storm.log4j2.conf.dir";
/**
* A global task scheduler used to assign topologies's tasks to supervisors' workers.
*
* <p>If this is not set, a default system scheduler will be used.
*/
@IsString
public static final String STORM_SCHEDULER = "storm.scheduler";
/**
* Max time to attempt to schedule one topology. The default is 60 seconds
*/
@IsInteger
@IsPositiveNumber
public static final String SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY = "scheduling.timeout.seconds.per.topology";
/**
* The number of seconds that the blacklist scheduler will concern of bad slots or supervisors.
*/
@IsPositiveNumber
public static final String BLACKLIST_SCHEDULER_TOLERANCE_TIME = "blacklist.scheduler.tolerance.time.secs";
/**
* The number of hit count that will trigger blacklist in tolerance time.
*/
@IsPositiveNumber
public static final String BLACKLIST_SCHEDULER_TOLERANCE_COUNT = "blacklist.scheduler.tolerance.count";
/**
* The number of seconds that the blacklisted slots or supervisor will be resumed.
*/
@IsPositiveNumber
public static final String BLACKLIST_SCHEDULER_RESUME_TIME = "blacklist.scheduler.resume.time.secs";
/**
* The class that the blacklist scheduler will report the blacklist.
*/
@NotNull
@IsImplementationOfClass(implementsClass = IReporter.class)
public static final String BLACKLIST_SCHEDULER_REPORTER = "blacklist.scheduler.reporter";
/**
* The class that specifies the eviction strategy to use in blacklist scheduler.
* If you are using the RAS scheduler please set this to
* "org.apache.storm.scheduler.blacklist.strategies.RasBlacklistStrategy" or you may
* get odd behavior when the cluster is full and there are blacklisted nodes.
*/
@NotNull
@IsImplementationOfClass(implementsClass = IBlacklistStrategy.class)
public static final String BLACKLIST_SCHEDULER_STRATEGY = "blacklist.scheduler.strategy";
/**
* Whether {@link org.apache.storm.scheduler.blacklist.BlacklistScheduler} will assume the supervisor is bad
* based on bad slots or not.
* A bad slot indicates the situation where the nimbus doesn't receive heartbeat from the worker in time,
* it's hard to differentiate if it's because of the supervisor node or the worker itself.
* If this is set to true, the scheduler will consider a supervisor is bad when seeing bad slots in it.
* Otherwise, the scheduler will assume a supervisor is bad only when it does not receive supervisor heartbeat in time.
*/
@IsBoolean
public static final String BLACKLIST_SCHEDULER_ASSUME_SUPERVISOR_BAD_BASED_ON_BAD_SLOT
= "blacklist.scheduler.assume.supervisor.bad.based.on.bad.slot";
/**
* Whether we want to display all the resource capacity and scheduled usage on the UI page. You MUST have this variable set if you are
* using any kind of resource-related scheduler.
* <p/>
* If this is not set, we will not display resource capacity and usage on the UI.
*/
@IsBoolean
public static final String SCHEDULER_DISPLAY_RESOURCE = "scheduler.display.resource";
/**
* The directory where storm's health scripts go.
*/
@IsString
public static final String STORM_HEALTH_CHECK_DIR = "storm.health.check.dir";
/**
* The time to allow any given healthcheck script to run before it is marked failed due to timeout.
*/
@IsNumber
public static final String STORM_HEALTH_CHECK_TIMEOUT_MS = "storm.health.check.timeout.ms";
/**
* This is the user that the Nimbus daemon process is running as. May be used when security is enabled to authorize actions in the
* cluster.
*/
@IsString
public static final String NIMBUS_DAEMON_USER = "nimbus.daemon.user";
/**
* This parameter is used by the storm-deploy project to configure the jvm options for the nimbus daemon.
*/
@IsStringOrStringList
public static final String NIMBUS_CHILDOPTS = "nimbus.childopts";
/**
* How long without heartbeating a task can go before nimbus will consider the task dead and reassign it to another location.
* Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set.
*/
@IsInteger
@IsPositiveNumber
public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
/**
* How often nimbus should wake up to check heartbeats and do reassignments. Note that if a machine ever goes down Nimbus will
* immediately wake up and take action. This parameter is for checking for failures when there's no explicit event like that occurring.
*/
@IsInteger
@IsPositiveNumber
public static final String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
/**
* How often nimbus should wake the cleanup thread to clean the inbox.
*
* @see #NIMBUS_INBOX_JAR_EXPIRATION_SECS
*/
@IsInteger
@IsPositiveNumber
public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";
/**
* The length of time a jar file lives in the inbox before being deleted by the cleanup thread.
*
* <p>Probably keep this value greater than or equal to NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS. Note that the time
* it takes to delete an inbox jar file is going to be somewhat more than NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS
* (depending on how often NIMBUS_CLEANUP_FREQ_SECS is set to).
*
* @see #NIMBUS_CLEANUP_INBOX_FREQ_SECS
*/
@IsInteger
public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
/**
* How long before a supervisor can go without heartbeating before nimbus considers it dead and stops assigning new work to it.
*/
@IsInteger
@IsPositiveNumber
public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = "nimbus.supervisor.timeout.secs";
/**
* A special timeout used when a task is initially launched. During launch, this is the timeout used until the first heartbeat,
* overriding nimbus.task.timeout.secs.
*
* <p>A separate timeout exists for launch because there can be quite a bit of overhead
* to launching new JVM's and configuring them.</p>
* Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set.
*/
@IsInteger
@IsPositiveNumber
public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
/**
* During upload/download with the master, how long an upload or download connection is idle before nimbus considers it dead and drops
* the connection.
*/
@IsInteger
public static final String NIMBUS_FILE_COPY_EXPIRATION_SECS = "nimbus.file.copy.expiration.secs";
/**
* A custom class that implements ITopologyValidator that is run whenever a topology is submitted. Can be used to provide
* business-specific logic for whether topologies are allowed to run or not.
*/
@IsString
public static final String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";
/**
* Class name for authorization plugin for Nimbus.
*/
@IsString
public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer";
/**
* Class name for authorization plugin for supervisor.
*/
@IsImplementationOfClass(implementsClass = IAuthorizer.class)
@IsString
public static final String SUPERVISOR_AUTHORIZER = "supervisor.authorizer";
/**
* Impersonation user ACL config entries.
*/
@IsString
public static final String NIMBUS_IMPERSONATION_AUTHORIZER = "nimbus.impersonation.authorizer";
/**
* How often nimbus should wake up to renew credentials if needed.
*/
@IsInteger
@IsPositiveNumber
public static final String NIMBUS_CREDENTIAL_RENEW_FREQ_SECS = "nimbus.credential.renewers.freq.secs";
/**
* FQCN of a class that implements {@code I} @see org.apache.storm.nimbus.ITopologyActionNotifierPlugin for details.
*/
@IsImplementationOfClass(implementsClass = ITopologyActionNotifierPlugin.class)
public static final String NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN = "nimbus.topology.action.notifier.plugin.class";
/**
* This controls the number of working threads for distributing master assignments to supervisors.
*/
@IsInteger
public static final String NIMBUS_ASSIGNMENTS_SERVICE_THREADS = "nimbus.assignments.service.threads";
/**
* This controls the number of working thread queue size of assignment service.
*/
@IsInteger
public static final String NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE = "nimbus.assignments.service.thread.queue.size";
/**
* class controls heartbeats recovery strategy.
*/
@IsString
public static final String NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS = "nimbus.worker.heartbeats.recovery.strategy.class";
/**
* This controls the number of milliseconds nimbus will wait before deleting a topology blobstore once detected it is able to delete.
*/
@IsInteger
public static final String NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MS = "nimbus.topology.blobstore.deletion.delay.ms";
/**
* Storm UI binds to this host/interface.
*/
@IsString
public static final String UI_HOST = "ui.host";
/**
* Storm UI binds to this port.
*/
@IsInteger
@IsPositiveNumber
public static final String UI_PORT = "ui.port";
/**
* This controls wheather Storm UI should bind to http port even if ui.port is > 0.
*/
@IsBoolean
public static final String UI_DISABLE_HTTP_BINDING = "ui.disable.http.binding";
/**
* This controls whether Storm UI displays spout lag for the Kafka spout.
*/
@IsBoolean
public static final String UI_DISABLE_SPOUT_LAG_MONITORING = "ui.disable.spout.lag.monitoring";
/**
* This controls wheather Storm Logviewer should bind to http port even if logviewer.port is > 0.
*/
@IsBoolean
public static final String LOGVIEWER_DISABLE_HTTP_BINDING = "logviewer.disable.http.binding";
/**
* This controls wheather Storm DRPC should bind to http port even if drpc.http.port is > 0.
*/
@IsBoolean
public static final String DRPC_DISABLE_HTTP_BINDING = "drpc.disable.http.binding";
/**
* Storm UI Project BUGTRACKER Link for reporting issue.
*/
@IsString
public static final String UI_PROJECT_BUGTRACKER_URL = "ui.project.bugtracker.url";
/**
* Storm UI Central Logging URL.
*/
@IsString
public static final String UI_CENTRAL_LOGGING_URL = "ui.central.logging.url";
/**
* Storm UI drop-down pagination value. Set ui.pagination to be a positive integer or -1 (displays all entries). Valid values: -1, 10,
* 20, 25 etc.
*/
@IsInteger
public static final String UI_PAGINATION = "ui.pagination";
/**
* HTTP UI port for log viewer.
*/
@IsInteger
@IsPositiveNumber
public static final String LOGVIEWER_PORT = "logviewer.port";
/**
* Childopts for log viewer java process.
*/
@IsStringOrStringList
public static final String LOGVIEWER_CHILDOPTS = "logviewer.childopts";
/**
* How often to clean up old log files.
*/
@IsInteger
@IsPositiveNumber
public static final String LOGVIEWER_CLEANUP_INTERVAL_SECS = "logviewer.cleanup.interval.secs";
/**
* How many minutes since a log was last modified for the log to be considered for clean-up.
*/
@IsInteger
@IsPositiveNumber
public static final String LOGVIEWER_CLEANUP_AGE_MINS = "logviewer.cleanup.age.mins";
/**
* The maximum number of bytes all worker log files can take up in MB.
*/
@IsPositiveNumber
public static final String LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB = "logviewer.max.sum.worker.logs.size.mb";
/**
* The maximum number of bytes per worker's files can take up in MB.
*/
@IsPositiveNumber
public static final String LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB = "logviewer.max.per.worker.logs.size.mb";
/**
* Storm Logviewer HTTPS port. Logviewer must use HTTPS if Storm UI is using HTTPS.
*/
@IsInteger
@IsPositiveNumber
public static final String LOGVIEWER_HTTPS_PORT = "logviewer.https.port";
/**
* Path to the keystore containing the certs used by Storm Logviewer for HTTPS communications.
*/
@IsString
public static final String LOGVIEWER_HTTPS_KEYSTORE_PATH = "logviewer.https.keystore.path";
/**
* Password for the keystore for HTTPS for Storm Logviewer.
*/
@IsString
@Password
public static final String LOGVIEWER_HTTPS_KEYSTORE_PASSWORD = "logviewer.https.keystore.password";
/**
* Type of the keystore for HTTPS for Storm Logviewer. see http://docs.oracle.com/javase/8/docs/api/java/security/KeyStore.html for more
* details.
*/
@IsString
public static final String LOGVIEWER_HTTPS_KEYSTORE_TYPE = "logviewer.https.keystore.type";
/**
* Password to the private key in the keystore for setting up HTTPS (SSL).
*/
@IsString
@Password
public static final String LOGVIEWER_HTTPS_KEY_PASSWORD = "logviewer.https.key.password";
/**
* Path to the truststore containing the certs used by Storm Logviewer for HTTPS communications.
*/
@IsString
public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PATH = "logviewer.https.truststore.path";
/**
* Password for the truststore for HTTPS for Storm Logviewer.
*/
@IsString
@Password
public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD = "logviewer.https.truststore.password";
/**
* Type of the truststore for HTTPS for Storm Logviewer. see http://docs.oracle.com/javase/8/docs/api/java/security/Truststore.html for
* more details.
*/
@IsString
public static final String LOGVIEWER_HTTPS_TRUSTSTORE_TYPE = "logviewer.https.truststore.type";
/**
* Password to the truststore used by Storm Logviewer setting up HTTPS (SSL).
*/
@IsBoolean
public static final String LOGVIEWER_HTTPS_WANT_CLIENT_AUTH = "logviewer.https.want.client.auth";
@IsBoolean
public static final String LOGVIEWER_HTTPS_NEED_CLIENT_AUTH = "logviewer.https.need.client.auth";
/**
* A list of users allowed to view logs via the Log Viewer.
*/
@IsStringList
public static final String LOGS_USERS = "logs.users";
/**
* A list of groups allowed to view logs via the Log Viewer.
*/
@IsStringList
public static final String LOGS_GROUPS = "logs.groups";
/**
* Appender name used by log viewer to determine log directory.
*/
@IsString
public static final String LOGVIEWER_APPENDER_NAME = "logviewer.appender.name";
/**
* A class implementing javax.servlet.Filter for authenticating/filtering Logviewer requests.
*/
@IsString
public static final String LOGVIEWER_FILTER = "logviewer.filter";
/**
* Initialization parameters for the javax.servlet.Filter for Logviewer.
*/
@IsMapEntryType(keyType = String.class, valueType = String.class)
public static final String LOGVIEWER_FILTER_PARAMS = "logviewer.filter.params";
/**
* Childopts for Storm UI Java process.
*/
@IsStringOrStringList
public static final String UI_CHILDOPTS = "ui.childopts";
/**
* A class implementing javax.servlet.Filter for authenticating/filtering UI requests.
*/
@IsString
public static final String UI_FILTER = "ui.filter";
/**
* Initialization parameters for the javax.servlet.Filter for UI.
*/
@IsMapEntryType(keyType = String.class, valueType = String.class)
public static final String UI_FILTER_PARAMS = "ui.filter.params";
/**
* The size of the header buffer for the UI in bytes.
*/
@IsInteger
@IsPositiveNumber
public static final String UI_HEADER_BUFFER_BYTES = "ui.header.buffer.bytes";
/**
* This port is used by Storm UI for receiving HTTPS (SSL) requests from clients.
*/
@IsInteger
@IsPositiveNumber
public static final String UI_HTTPS_PORT = "ui.https.port";
/**
* Path to the keystore used by Storm UI for setting up HTTPS (SSL).
*/
@IsString
public static final String UI_HTTPS_KEYSTORE_PATH = "ui.https.keystore.path";
/**
* Password to the keystore used by Storm UI for setting up HTTPS (SSL).
*/
@IsString
@Password
public static final String UI_HTTPS_KEYSTORE_PASSWORD = "ui.https.keystore.password";
/**
* Type of keystore used by Storm UI for setting up HTTPS (SSL). see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore
* .html
* for more details.
*/
@IsString
public static final String UI_HTTPS_KEYSTORE_TYPE = "ui.https.keystore.type";
/**
* Password to the private key in the keystore for setting up HTTPS (SSL).
*/
@IsString
@Password
public static final String UI_HTTPS_KEY_PASSWORD = "ui.https.key.password";
/**
* Path to the truststore used by Storm UI setting up HTTPS (SSL).
*/
@IsString
public static final String UI_HTTPS_TRUSTSTORE_PATH = "ui.https.truststore.path";
/**
* Password to the truststore used by Storm UI setting up HTTPS (SSL).
*/
@IsString
@Password
public static final String UI_HTTPS_TRUSTSTORE_PASSWORD = "ui.https.truststore.password";
/**
* Type of truststore used by Storm UI for setting up HTTPS (SSL). see http://docs.oracle
* .com/javase/7/docs/api/java/security/KeyStore.html
* for more details.
*/
@IsString
public static final String UI_HTTPS_TRUSTSTORE_TYPE = "ui.https.truststore.type";
/**
* Password to the truststore used by Storm DRPC setting up HTTPS (SSL).
*/
@IsBoolean
public static final String UI_HTTPS_WANT_CLIENT_AUTH = "ui.https.want.client.auth";
@IsBoolean
public static final String UI_HTTPS_NEED_CLIENT_AUTH = "ui.https.need.client.auth";
/**
* The maximum number of threads that should be used by the Pacemaker. When Pacemaker gets loaded it will spawn new threads, up to this
* many total, to handle the load.
*/
@IsNumber
@IsPositiveNumber
public static final String PACEMAKER_MAX_THREADS = "pacemaker.max.threads";
/**
* This parameter is used by the storm-deploy project to configure the jvm options for the pacemaker daemon.
*/
@IsStringOrStringList
public static final String PACEMAKER_CHILDOPTS = "pacemaker.childopts";
/**
* This port is used by Storm DRPC for receiving HTTP DPRC requests from clients.
*/
@IsInteger
public static final String DRPC_HTTP_PORT = "drpc.http.port";
/**
* This port is used by Storm DRPC for receiving HTTPS (SSL) DPRC requests from clients.
*/
@IsInteger
public static final String DRPC_HTTPS_PORT = "drpc.https.port";
/**
* Path to the keystore used by Storm DRPC for setting up HTTPS (SSL).
*/
@IsString
public static final String DRPC_HTTPS_KEYSTORE_PATH = "drpc.https.keystore.path";
/**
* Password to the keystore used by Storm DRPC for setting up HTTPS (SSL).
*/
@IsString
@Password
public static final String DRPC_HTTPS_KEYSTORE_PASSWORD = "drpc.https.keystore.password";
/**
* Type of keystore used by Storm DRPC for setting up HTTPS (SSL). see http://docs.oracle
* .com/javase/7/docs/api/java/security/KeyStore.html
* for more details.
*/
@IsString
public static final String DRPC_HTTPS_KEYSTORE_TYPE = "drpc.https.keystore.type";
/**
* Password to the private key in the keystore for setting up HTTPS (SSL).
*/
@IsString
@Password
public static final String DRPC_HTTPS_KEY_PASSWORD = "drpc.https.key.password";
/**
* Path to the truststore used by Storm DRPC setting up HTTPS (SSL).
*/
@IsString
public static final String DRPC_HTTPS_TRUSTSTORE_PATH = "drpc.https.truststore.path";
/**
* Password to the truststore used by Storm DRPC setting up HTTPS (SSL).
*/
@IsString
@Password
public static final String DRPC_HTTPS_TRUSTSTORE_PASSWORD = "drpc.https.truststore.password";
/**
* Type of truststore used by Storm DRPC for setting up HTTPS (SSL). see http://docs.oracle
* .com/javase/7/docs/api/java/security/KeyStore.html
* for more details.
*/
@IsString
public static final String DRPC_HTTPS_TRUSTSTORE_TYPE = "drpc.https.truststore.type";
/**
* Password to the truststore used by Storm DRPC setting up HTTPS (SSL).
*/
@IsBoolean
public static final String DRPC_HTTPS_WANT_CLIENT_AUTH = "drpc.https.want.client.auth";
@IsBoolean
public static final String DRPC_HTTPS_NEED_CLIENT_AUTH = "drpc.https.need.client.auth";
/**
* Class name for authorization plugin for DRPC client.
*/
@IsString
public static final String DRPC_AUTHORIZER = "drpc.authorizer";
/**
* The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note that requests can also timeout based on the socket
* timeout on the DRPC client, and separately based on the topology message timeout for the topology implementing the DRPC function.
*/
@IsInteger
@IsPositiveNumber
@NotNull
public static final String DRPC_REQUEST_TIMEOUT_SECS = "drpc.request.timeout.secs";
/**
* Childopts for Storm DRPC Java process.
*/
@IsStringOrStringList
public static final String DRPC_CHILDOPTS = "drpc.childopts";
/**
* the metadata configured on the supervisor.
*/
@IsMapEntryType(keyType = String.class, valueType = String.class)
public static final String SUPERVISOR_SCHEDULER_META = "supervisor.scheduler.meta";
/**
* A list of ports that can run workers on this supervisor. Each worker uses one port, and the supervisor will only run one worker per
* port. Use this configuration to tune how many workers run on each machine.
*/
@IsNoDuplicateInList
@NotNull
@IsListEntryCustom(entryValidatorClasses = { ConfigValidation.IntegerValidator.class, ConfigValidation.PositiveNumberValidator.class })
public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
/**
* What blobstore implementation the supervisor should use.
*/
@IsString
public static final String SUPERVISOR_BLOBSTORE = "supervisor.blobstore.class";
/**
* The distributed cache target size in MB. This is a soft limit to the size of the distributed cache contents.
*/
@IsPositiveNumber
@IsInteger
public static final String SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB = "supervisor.localizer.cache.target.size.mb";
/**
* The distributed cache cleanup interval. Controls how often it scans to attempt to cleanup anything over the cache target size.
*/
@IsPositiveNumber
@IsInteger
public static final String SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = "supervisor.localizer.cleanup.interval.ms";
/**
* What blobstore download parallelism the supervisor should use.
*/
@IsPositiveNumber
@IsInteger
public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT = "supervisor.blobstore.download.thread.count";
/**
* Maximum number of retries a supervisor is allowed to make for downloading a blob.
*/
@IsPositiveNumber
@IsInteger
public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES = "supervisor.blobstore.download.max_retries";
/**
* What blobstore implementation nimbus should use.
*/
@IsString
public static final String NIMBUS_BLOBSTORE = "nimbus.blobstore.class";
/**
* During operations with the blob store, via master, how long a connection is idle before nimbus considers it dead and drops the
* session and any associated connections.
*/
@IsPositiveNumber
@IsInteger
public static final String NIMBUS_BLOBSTORE_EXPIRATION_SECS = "nimbus.blobstore.expiration.secs";
/**
* A number representing the maximum number of workers any single topology can acquire.
* This will be ignored if the Resource Aware Scheduler is used.
*/
@IsInteger
@IsPositiveNumber(includeZero = true)
public static final String NIMBUS_SLOTS_PER_TOPOLOGY = "nimbus.slots.perTopology";
/**
* A class implementing javax.servlet.Filter for DRPC HTTP requests.
*/
@IsString
public static final String DRPC_HTTP_FILTER = "drpc.http.filter";
/**
* Initialization parameters for the javax.servlet.Filter of the DRPC HTTP service.
*/
@IsMapEntryType(keyType = String.class, valueType = String.class)
public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params";
/**
* A number representing the maximum number of executors any single topology can acquire.
*/
@IsInteger
@IsPositiveNumber(includeZero = true)
public static final String NIMBUS_EXECUTORS_PER_TOPOLOGY = "nimbus.executors.perTopology";
/**
* This parameter is used by the storm-deploy project to configure the jvm options for the supervisor daemon.
*/
@IsStringOrStringList
public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";
/**
* How long a worker can go without heartbeating during the initial launch before the supervisor tries to restart the worker process.
* This value override supervisor.worker.timeout.secs during launch because there is additional overhead to starting and configuring the
* JVM on launch.
* Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set.
*/
@IsInteger
@IsPositiveNumber
@NotNull
public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = "supervisor.worker.start.timeout.secs";
/**
* Whether or not the supervisor should launch workers assigned to it. Defaults to true -- and you should probably never change this
* value. This configuration is used in the Storm unit tests.
*/
@IsBoolean
public static final String SUPERVISOR_ENABLE = "supervisor.enable";
/**
* how often the supervisor sends a heartbeat to the master.
*/
@IsInteger
public static final String SUPERVISOR_HEARTBEAT_FREQUENCY_SECS = "supervisor.heartbeat.frequency.secs";
/**
* How often the supervisor checks the worker heartbeats to see if any of them need to be restarted.
*/
@IsInteger
@IsPositiveNumber
public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
/**
* The jvm profiler opts provided to workers launched by this supervisor.
*/
@IsStringOrStringList
public static final String WORKER_PROFILER_CHILDOPTS = "worker.profiler.childopts";
/**
* Enable profiling of worker JVMs using Oracle's Java Flight Recorder. Unlocking commercial features requires a special license from
* Oracle. See http://www.oracle.com/technetwork/java/javase/terms/products/index.html
*/
@IsBoolean
public static final String WORKER_PROFILER_ENABLED = "worker.profiler.enabled";
/**
* The command launched supervisor with worker arguments pid, action and [target_directory] Where action is - start profile, stop
* profile, jstack, heapdump and kill against pid.
*/
@IsString
public static final String WORKER_PROFILER_COMMAND = "worker.profiler.command";
/**
* A list of classes implementing IClusterMetricsConsumer (See storm.yaml.example for exact config format). Each listed class will be
* routed cluster related metrics data. Each listed class maps 1:1 to a ClusterMetricsConsumerExecutor and they're executed in Nimbus.
* Only consumers which run in leader Nimbus receives metrics data.
*/
@IsListEntryCustom(entryValidatorClasses = { ConfigValidation.ClusterMetricRegistryValidator.class })
public static final String STORM_CLUSTER_METRICS_CONSUMER_REGISTER = "storm.cluster.metrics.consumer.register";
/**
* How often cluster metrics data is published to metrics consumer.
*/
@NotNull
@IsPositiveNumber
public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS =
"storm.cluster.metrics.consumer.publish.interval.secs";
/**
* Enables user-first classpath. See topology.classpath.beginning.
*/
@IsBoolean
public static final String STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED = "storm.topology.classpath.beginning.enabled";
/**
* This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers) for the java.library.path value. java.library.path tells
* the JVM where to look for native libraries. It is necessary to set this config correctly since Storm uses the ZeroMQ and JZMQ native
* libs.
*/
@IsString
public static final String JAVA_LIBRARY_PATH = "java.library.path";
/**
* The path to use as the zookeeper dir when running a zookeeper server via "storm dev-zookeeper". This zookeeper instance is only
* intended for development; it is not a production grade zookeeper setup.
*/
@IsString
public static final String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
/**
* A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler to
* org.apache.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
*/
@IsMapEntryType(keyType = String.class, valueType = Number.class)
public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
/**
* For ArtifactoryConfigLoader, this can either be a reference to an individual file in Artifactory or to a directory. If it is a
* directory, the file with the largest lexographic name will be returned. Users need to add "artifactory+" to the beginning of the real
* URI to use ArtifactoryConfigLoader. For FileConfigLoader, this is the URI pointing to a file.
*/
@IsString
public static final String SCHEDULER_CONFIG_LOADER_URI = "scheduler.config.loader.uri";
/**
* It is the frequency at which the plugin will call out to artifactory instead of returning the most recently cached result. Currently
* it's only used in ArtifactoryConfigLoader.
*/
@IsInteger
@IsPositiveNumber
public static final String SCHEDULER_CONFIG_LOADER_POLLTIME_SECS = "scheduler.config.loader.polltime.secs";
/**
* It is the amount of time an http connection to the artifactory server will wait before timing out. Currently it's only used in
* ArtifactoryConfigLoader.
*/
@IsInteger
@IsPositiveNumber
public static final String SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS = "scheduler.config.loader.timeout.secs";
/**
* It is the part of the uri, configurable in Artifactory, which represents the top of the directory tree. It's only used in
* ArtifactoryConfigLoader.
*/
@IsString
public static final String SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY = "scheduler.config.loader.artifactory.base.directory";
/**
* A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler to
* org.apache.storm.scheduler.multitenant.MultitenantScheduler
*/
@IsMapEntryType(keyType = String.class, valueType = Number.class)
public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools";
/**
* A map of users to another map of the resource guarantees of the user. Used by Resource Aware Scheduler to ensure per user resource
* guarantees.
*/
@IsMapEntryCustom(
keyValidatorClasses = { ConfigValidation.StringValidator.class },
valueValidatorClasses = { ConfigValidation.UserResourcePoolEntryValidator.class })
public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools";
/**
* the class that specifies the scheduling priority strategy to use in ResourceAwareScheduler.
*/
@NotNull
@IsImplementationOfClass(implementsClass = ISchedulingPriorityStrategy.class)
public static final String RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY = "resource.aware.scheduler.priority.strategy";
/**
* The maximum number of times that the RAS will attempt to schedule a topology. The default is 5.
*/
@IsInteger
@IsPositiveNumber
public static final String RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS =
"resource.aware.scheduler.max.topology.scheduling.attempts";
/*
* The maximum number of states that will be searched looking for a solution in the constraint solver strategy
*/
@IsInteger
@IsPositiveNumber
public static final String RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH = "resource.aware.scheduler.constraint.max.state.search";
/**
* How often nimbus's background thread to sync code for missing topologies should run.
*/
@IsInteger
public static final String NIMBUS_CODE_SYNC_FREQ_SECS = "nimbus.code.sync.freq.secs";
/**
* The plugin to be used for resource isolation.
*/
@IsImplementationOfClass(implementsClass = ResourceIsolationInterface.class)
public static final String STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin";
/**
* CGroup Setting below.
*/
/**
* resources to to be controlled by cgroups.
*/
@IsStringList
public static final String STORM_CGROUP_RESOURCES = "storm.cgroup.resources";
/**
* name for the cgroup hierarchy.
*/
@IsString
public static final String STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name";
/**
* flag to determine whether to use a resource isolation plugin Also determines whether the unit tests for cgroup runs. If
* storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run
*/
@IsBoolean
public static final String STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE = "storm.resource.isolation.plugin.enable";
/**
* Class implementing MetricStore. Runs on Nimbus.
*/
@NotNull
@IsString
// Validating class implementation could fail on non-Nimbus Daemons. Nimbus will catch the class not found on startup
// and log an error message, so just validating this as a String for now.
public static final String STORM_METRIC_STORE_CLASS = "storm.metricstore.class";
/**
* Class implementing WorkerMetricsProcessor. Runs on Supervisors.
*/
@NotNull
@IsString
public static final String STORM_METRIC_PROCESSOR_CLASS = "storm.metricprocessor.class";
/**
* RocksDB file location. This setting is specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore implementation for the
* storm.metricstore.class.
*/
@IsString
public static final String STORM_ROCKSDB_LOCATION = "storm.metricstore.rocksdb.location";
/**
* RocksDB create if missing flag. This setting is specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore implementation for
* the storm.metricstore.class.
*/
@IsBoolean
public static final String STORM_ROCKSDB_CREATE_IF_MISSING = "storm.metricstore.rocksdb.create_if_missing";
/**
* RocksDB metadata cache capacity. This setting is specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore implementation for
* the storm.metricstore.class.
*/
@IsInteger
public static final String STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY = "storm.metricstore.rocksdb.metadata_string_cache_capacity";
/**
* RocksDB setting for length of metric retention. This setting is specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore
* implementation for the storm.metricstore.class.
*/
@IsInteger
public static final String STORM_ROCKSDB_METRIC_RETENTION_HOURS = "storm.metricstore.rocksdb.retention_hours";
// Configs for memory enforcement done by the supervisor (not cgroups directly)
/**
* RocksDB setting for period of metric deletion thread. This setting is specific to the org.apache.storm.metricstore.rocksdb
* .RocksDbStore
* implementation for the storm.metricstore.class.
*/
@IsInteger
public static final String STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS = "storm.metricstore.rocksdb.deletion_period_hours";
/**
* In nimbus on startup check if all of the zookeeper ACLs are correct before starting. If not don't start nimbus.
*/
@IsBoolean
public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK = "storm.nimbus.zookeeper.acls.check";
/**
* In nimbus on startup check if all of the zookeeper ACLs are correct before starting. If not do your best to fix them before nimbus
* starts, if it cannot fix them nimbus will not start. This overrides any value set for storm.nimbus.zookeeper.acls.check.
*/
@IsBoolean
public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP = "storm.nimbus.zookeeper.acls.fixup";
/**
* Server side validation that @{see Config#TOPOLOGY_SCHEDULER_STRATEGY} is set ot a subclass of IStrategy.
*/
@IsImplementationOfClass(implementsClass = IStrategy.class)
public static final String VALIDATE_TOPOLOGY_SCHEDULER_STRATEGY = Config.TOPOLOGY_SCHEDULER_STRATEGY;
/**
* Class name of the HTTP credentials plugin for the UI.
*/
@IsImplementationOfClass(implementsClass = IHttpCredentialsPlugin.class)
public static final String UI_HTTP_CREDS_PLUGIN = "ui.http.creds.plugin";
/**
* Class name of the HTTP credentials plugin for DRPC.
*/
@IsImplementationOfClass(implementsClass = IHttpCredentialsPlugin.class)
public static final String DRPC_HTTP_CREDS_PLUGIN = "drpc.http.creds.plugin";
/**
* root directory for cgoups.
*/
@IsString
public static String STORM_SUPERVISOR_CGROUP_ROOTDIR = "storm.supervisor.cgroup.rootdir";
/**
* the manually set memory limit (in MB) for each CGroup on supervisor node.
*/
@IsPositiveNumber
public static String STORM_WORKER_CGROUP_MEMORY_MB_LIMIT = "storm.worker.cgroup.memory.mb.limit";
/**
* the manually set cpu share for each CGroup on supervisor node.
*/
@IsPositiveNumber
public static String STORM_WORKER_CGROUP_CPU_LIMIT = "storm.worker.cgroup.cpu.limit";
/**
* full path to cgexec command.
*/
@IsString
public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd";
/**
* Please use STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB instead. The amount of memory a worker can exceed its allocation before
* cgroup will kill it.
*/
@IsPositiveNumber(includeZero = true)
public static String STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB =
"storm.cgroup.memory.limit.tolerance.margin.mb";
/**
* To determine whether or not to cgroups should inherit cpuset.cpus and cpuset.mems config values form parent cgroup
* Note that cpuset.cpus and cpuset.mems configs in a cgroup must be initialized (i.e. contain a valid value) prior to
* being able to launch processes in that cgroup. The common use case for this config is when the linux distribution
* that is used does not support the cgroup.clone_children config.
*/
@IsBoolean
public static String STORM_CGROUP_INHERIT_CPUSET_CONFIGS = "storm.cgroup.inherit.cpuset.configs";
/**
* Java does not always play nicely with cgroups. It is coming but not fully implemented and not for the way storm uses cgroups. In the
* short term you can disable the hard memory enforcement by cgroups and let the supervisor handle shooting workers going over their
* limit in a kinder way.
*/
@IsBoolean
public static String STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE = "storm.cgroup.memory.enforcement.enable";
/**
* Memory given to each worker for free (because java and storm have some overhead). This is memory on the box that the workers can use.
* This should not be included in SUPERVISOR_MEMORY_CAPACITY_MB, as nimbus does not use this memory for scheduling.
*/
@IsPositiveNumber
public static String STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB =
"storm.supervisor.memory.limit.tolerance.margin.mb";
/**
* A multiplier for the memory limit of a worker that will have the supervisor shoot it immediately. 1.0 means shoot the worker as soon
* as it goes over. 2.0 means shoot the worker if its usage is double what was requested. This value is combined with
* STORM_SUPERVISOR_HARD_MEMORY_LIMIT_OVERAGE and which ever is greater is used for enforcement. This allows small workers to not be
* shot.
*/
@IsPositiveNumber
public static String STORM_SUPERVISOR_HARD_MEMORY_LIMIT_MULTIPLIER =
"storm.supervisor.hard.memory.limit.multiplier";
/**
* If the memory usage of a worker goes over its limit by this value is it shot immediately. This value is combined with
* STORM_SUPERVISOR_HARD_LIMIT_MEMORY_MULTIPLIER and which ever is greater is used for enforcement. This allows small workers to not be
* shot.
*/
@IsPositiveNumber(includeZero = true)
public static String STORM_SUPERVISOR_HARD_LIMIT_MEMORY_OVERAGE_MB = "storm.supervisor.hard.memory.limit.overage.mb";
/**
* If the amount of memory that is free in the system (either on the box or in the supervisor's cgroup) is below this number (in MB)
* consider the system to be in low memory mode and start shooting workers if they are over their limit.
*/
@IsPositiveNumber
public static String STORM_SUPERVISOR_LOW_MEMORY_THRESHOLD_MB = "storm.supervisor.low.memory.threshold.mb";
/**
* If the amount of memory that is free in the system (either on the box or in the supervisor's cgroup) is below this number (in MB)
* consider the system to be a little low on memory and start shooting workers if they are over their limit for a given grace period
* STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD_MS.
*/
@IsPositiveNumber
public static String STORM_SUPERVISOR_MEDIUM_MEMORY_THRESHOLD_MB = "storm.supervisor.medium.memory.threshold.mb";
/**
* The number of milliseconds that a worker is allowed to be over their limit when there is a medium amount of memory free in the
* system.
*/
@IsPositiveNumber
public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD_MS =
"storm.supervisor.medium.memory.grace.period.ms";
/**
* The config indicates the minimum percentage of cpu for a core that a worker will use. Assuming the a core value to be
* 100, a value of 10 indicates 10% of the core. The P in PCORE represents the term "physical". A default value will be set for this
* config if user does not override.
* <P></P>
* Workers in containers or cgroups may require a minimum amount of CPU in order to launch within the supervisor timeout.
* This setting allows configuring this to occur.
*/
@IsPositiveNumber(includeZero = true)
public static String STORM_WORKER_MIN_CPU_PCORE_PERCENT = "storm.worker.min.cpu.pcore.percent";
// VALIDATION ONLY CONFIGS
// Some configs inside Config.java may reference classes we don't want to expose in storm-client, but we still want to validate
// That they reference a valid class. To allow this to happen we do part of the validation on the client side with annotations on
// static final members of the Config class, and other validations here. We avoid naming them the same thing because clojure code
// walks these two classes and creates clojure constants for these values.
/**
* The number of hours a worker token is valid for. This also sets how frequently worker tokens will be renewed.
*/
@IsPositiveNumber
public static String STORM_WORKER_TOKEN_LIFE_TIME_HOURS = "storm.worker.token.life.time.hours";
public static String getCgroupRootDir(Map<String, Object> conf) {
return (String) conf.get(STORM_SUPERVISOR_CGROUP_ROOTDIR);
}
public static String getCgroupStormHierarchyDir(Map<String, Object> conf) {
return (String) conf.get(Config.STORM_CGROUP_HIERARCHY_DIR);
}
/**
* Get the cgroup resources from the conf.
*
* @param conf the config to read
* @return the resources.
*/
public static ArrayList<String> getCgroupStormResources(Map<String, Object> conf) {
ArrayList<String> ret = new ArrayList<>();
for (String entry : ((Iterable<String>) conf.get(DaemonConfig.STORM_CGROUP_RESOURCES))) {
ret.add(entry);
}
return ret;
}
public static String getCgroupStormHierarchyName(Map<String, Object> conf) {
return (String) conf.get(DaemonConfig.STORM_CGROUP_HIERARCHY_NAME);
}
}