blob: 6da4b8d80e293527032512952953df8e8dd5fd98 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.twitter.distributedlog.bk.QuorumConfig;
import com.twitter.distributedlog.feature.DefaultFeatureProvider;
import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
import com.twitter.distributedlog.net.DNSResolverForRacks;
import com.twitter.distributedlog.net.DNSResolverForRows;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
/**
* DistributedLog Configuration.
* <p>
* DistributedLog configuration is basically a properties based configuration, which extends from
* Apache commons {@link CompositeConfiguration}. All the DL settings are in camel case and prefixed
* with a meaningful component name. for example, `zkSessionTimeoutSeconds` means <i>SessionTimeoutSeconds</i>
* for component `zk`.
*
* <h3>BookKeeper Configuration</h3>
*
* BookKeeper client configuration settings could be loaded via DistributedLog configuration. All those
* settings are prefixed with <i>`bkc.`</i>. For example, <i>bkc.zkTimeout</i> in distributedlog configuration
* will be applied as <i>`zkTimeout`</i> in bookkeeper client configuration.
*
* <h3>How to load configuration</h3>
*
* The default distributedlog configuration is constructed by instantiated a new instance. This
* distributedlog configuration will automatically load the settings that specified via
* {@link SystemConfiguration}.
*
* <pre>
* DistributedLogConfiguration conf = new DistributedLogConfiguration();
* </pre>
*
* The recommended way is to load configuration from URL that points to a configuration file
* ({@link #loadConf(URL)}).
*
* <pre>
* String configFile = "/path/to/distributedlog/conf/file";
* DistributedLogConfiguration conf = new DistributedLogConfiguration();
* conf.loadConf(new File(configFile).toURI().toURL());
* </pre>
*
* @see org.apache.bookkeeper.conf.ClientConfiguration
*/
public class DistributedLogConfiguration extends CompositeConfiguration {
static final Logger LOG = LoggerFactory.getLogger(DistributedLogConfiguration.class);
private static ClassLoader defaultLoader;
static {
defaultLoader = Thread.currentThread().getContextClassLoader();
if (null == defaultLoader) {
defaultLoader = DistributedLogConfiguration.class.getClassLoader();
}
}
//
// ZooKeeper Related Settings
//
public static final String BKDL_ZK_ACL_ID = "zkAclId";
public static final String BKDL_ZK_ACL_ID_DEFAULT = null;
public static final String BKDL_ZK_SESSION_TIMEOUT_SECONDS = "zkSessionTimeoutSeconds";
public static final int BKDL_ZK_SESSION_TIMEOUT_SECONDS_DEFAULT = 30;
public static final String BKDL_ZK_REQUEST_RATE_LIMIT = "zkRequestRateLimit";
public static final double BKDL_ZK_REQUEST_RATE_LIMIT_DEFAULT = 0;
public static final String BKDL_ZK_NUM_RETRIES = "zkNumRetries";
public static final int BKDL_ZK_NUM_RETRIES_DEFAULT = 3;
public static final String BKDL_ZK_RETRY_BACKOFF_START_MILLIS = "zkRetryStartBackoffMillis";
public static final int BKDL_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT = 5000;
public static final String BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS = "zkRetryMaxBackoffMillis";
public static final int BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT = 30000;
public static final String BKDL_ZKCLIENT_NUM_RETRY_THREADS = "zkcNumRetryThreads";
public static final int BKDL_ZKCLIENT_NUM_RETRY_THREADS_DEFAULT = 1;
//
// BookKeeper Related Settings
//
// BookKeeper zookeeper settings
public static final String BKDL_BKCLIENT_ZK_SESSION_TIMEOUT = "bkcZKSessionTimeoutSeconds";
public static final int BKDL_BKCLIENT_ZK_SESSION_TIMEOUT_DEFAULT = 30;
public static final String BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT = "bkcZKRequestRateLimit";
public static final double BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT_DEFAULT = 0;
public static final String BKDL_BKCLIENT_ZK_NUM_RETRIES = "bkcZKNumRetries";
public static final int BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT = 3;
public static final String BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS = "bkcZKRetryStartBackoffMillis";
public static final int BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT = 5000;
public static final String BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS = "bkcZKRetryMaxBackoffMillis";
public static final int BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT = 30000;
// Bookkeeper ensemble placement settings
// Bookkeeper ensemble size
public static final String BKDL_BOOKKEEPER_ENSEMBLE_SIZE = "bkcEnsembleSize";
// @Deprecated
public static final String BKDL_BOOKKEEPER_ENSEMBLE_SIZE_OLD = "ensemble-size";
public static final int BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
// Bookkeeper write quorum size
public static final String BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE = "bkcWriteQuorumSize";
// @Deprecated
public static final String BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_OLD = "write-quorum-size";
public static final int BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_DEFAULT = 3;
// Bookkeeper ack quorum size
public static final String BKDL_BOOKKEEPER_ACK_QUORUM_SIZE = "bkcAckQuorumSize";
// @Deprecated
public static final String BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_OLD = "ack-quorum-size";
public static final int BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_DEFAULT = 2;
public static final String BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT = "bkRowAwareEnsemblePlacement";
public static final String BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_OLD = "row-aware-ensemble-placement";
public static final boolean BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_DEFAULT = false;
public static final String BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS = "bkEnsemblePlacementDnsResolverClass";
public static final String BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS_DEFAULT =
DNSResolverForRacks.class.getName();
public static final String BKDL_BK_DNS_RESOLVER_OVERRIDES = "dnsResolverOverrides";
public static final String BKDL_BK_DNS_RESOLVER_OVERRIDES_DEFAULT = "";
// General Settings
// @Deprecated
public static final String BKDL_BOOKKEEPER_DIGEST_PW = "digestPw";
public static final String BKDL_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
public static final String BKDL_BKCLIENT_NUM_IO_THREADS = "bkcNumIOThreads";
public static final String BKDL_TIMEOUT_TIMER_TICK_DURATION_MS = "timerTickDuration";
public static final long BKDL_TIMEOUT_TIMER_TICK_DURATION_MS_DEFAULT = 100;
public static final String BKDL_TIMEOUT_TIMER_NUM_TICKS = "timerNumTicks";
public static final int BKDL_TIMEOUT_TIMER_NUM_TICKS_DEFAULT = 1024;
//
// Deprecated BookKeeper Settings (in favor of "bkc." style bookkeeper settings)
//
public static final String BKDL_BKCLIENT_READ_TIMEOUT = "bkcReadTimeoutSeconds";
public static final int BKDL_BKCLIENT_READ_TIMEOUT_DEFAULT = 10;
public static final String BKDL_BKCLIENT_WRITE_TIMEOUT = "bkcWriteTimeoutSeconds";
public static final int BKDL_BKCLIENT_WRITE_TIMEOUT_DEFAULT = 10;
public static final String BKDL_BKCLIENT_NUM_WORKER_THREADS = "bkcNumWorkerThreads";
public static final int BKDL_BKCLEINT_NUM_WORKER_THREADS_DEFAULT = 1;
//
// DL General Settings
//
// Executor Parameters
public static final String BKDL_NUM_WORKER_THREADS = "numWorkerThreads";
public static final String BKDL_NUM_READAHEAD_WORKER_THREADS = "numReadAheadWorkerThreads";
public static final String BKDL_NUM_LOCKSTATE_THREADS = "numLockStateThreads";
public static final String BKDL_NUM_RESOURCE_RELEASE_THREADS = "numResourceReleaseThreads";
public static final String BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS = "schedulerShutdownTimeoutMs";
public static final int BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS_DEFAULT = 5000;
public static final String BKDL_USE_DAEMON_THREAD = "useDaemonThread";
public static final boolean BKDL_USE_DAEMON_THREAD_DEFAULT = false;
// Metadata Parameters
public static final String BKDL_LEDGER_METADATA_LAYOUT_VERSION = "ledgerMetadataLayoutVersion";
public static final String BKDL_LEDGER_METADATA_LAYOUT_VERSION_OLD = "ledger-metadata-layout";
public static final int BKDL_LEDGER_METADATA_LAYOUT_VERSION_DEFAULT =
LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value;
public static final String BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK = "ledgerMetadataSkipMinVersionCheck";
public static final boolean BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK_DEFAULT = false;
public static final String BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER = "firstLogsegmentSequenceNumber";
public static final String BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_OLD = "first-logsegment-sequence-number";
public static final long BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_DEFAULT =
DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO;
public static final String BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED = "logSegmentSequenceNumberValidationEnabled";
public static final boolean BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED_DEFAULT = true;
public static final String BKDL_ENABLE_RECORD_COUNTS = "enableRecordCounts";
public static final boolean BKDL_ENABLE_RECORD_COUNTS_DEFAULT = true;
public static final String BKDL_MAXID_SANITYCHECK = "maxIdSanityCheck";
public static final boolean BKDL_MAXID_SANITYCHECK_DEFAULT = true;
public static final String BKDL_ENCODE_REGION_ID_IN_VERSION = "encodeRegionIDInVersion";
public static final boolean BKDL_ENCODE_REGION_ID_IN_VERSION_DEFAULT = false;
// (@Deprecated)
public static final String BKDL_LOGSEGMENT_NAME_VERSION = "logSegmentNameVersion";
public static final int BKDL_LOGSEGMENT_NAME_VERSION_DEFAULT = DistributedLogConstants.LOGSEGMENT_NAME_VERSION;
// (@Derepcated) Name for the default (non-partitioned) stream
public static final String BKDL_UNPARTITIONED_STREAM_NAME = "unpartitionedStreamName";
public static final String BKDL_UNPARTITIONED_STREAM_NAME_DEFAULT = "<default>";
// Log Segment Cache Parameters
public static final String BKDL_LOGSEGMENT_CACHE_TTL_MS = "logSegmentCacheTTLMs";
public static final long BKDL_LOGSEGMENT_CACHE_TTL_MS_DEFAULT = 600000; // 10 mins
public static final String BKDL_LOGSEGMENT_CACHE_MAX_SIZE = "logSegmentCacheMaxSize";
public static final long BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT = 10000;
public static final String BKDL_LOGSEGMENT_CACHE_ENABLED = "logSegmentCacheEnabled";
public static final boolean BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT = true;
//
// DL Writer Settings
//
// General Settings
public static final String BKDL_CREATE_STREAM_IF_NOT_EXISTS = "createStreamIfNotExists";
public static final boolean BKDL_CREATE_STREAM_IF_NOT_EXISTS_DEFAULT = true;
public static final String BKDL_LOG_FLUSH_TIMEOUT = "logFlushTimeoutSeconds";
public static final int BKDL_LOG_FLUSH_TIMEOUT_DEFAULT = 30;
/**
* CompressionCodec.Type String to use (See CompressionUtils)
* --------------------- ------------------------------------
* NONE none
* LZ4 lz4
* UNKNOWN any other instance of String.class
*/
public static final String BKDL_COMPRESSION_TYPE = "compressionType";
public static final String BKDL_COMPRESSION_TYPE_DEFAULT = "none";
public static final String BKDL_FAILFAST_ON_STREAM_NOT_READY = "failFastOnStreamNotReady";
public static final boolean BKDL_FAILFAST_ON_STREAM_NOT_READY_DEFAULT = false;
public static final String BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR = "disableRollingOnLogSegmentError";
public static final boolean BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR_DEFAULT = false;
// Durability Settings
public static final String BKDL_IS_DURABLE_WRITE_ENABLED = "isDurableWriteEnabled";
public static final boolean BKDL_IS_DURABLE_WRITE_ENABLED_DEFAULT = true;
// Transmit Settings
public static final String BKDL_OUTPUT_BUFFER_SIZE = "writerOutputBufferSize";
public static final String BKDL_OUTPUT_BUFFER_SIZE_OLD = "output-buffer-size";
public static final int BKDL_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
public static final String BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS = "periodicFlushFrequencyMilliSeconds";
public static final int BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS_DEFAULT = 0;
public static final String BKDL_ENABLE_IMMEDIATE_FLUSH = "enableImmediateFlush";
public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false;
public static final String BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = "minimumDelayBetweenImmediateFlushMilliSeconds";
public static final int BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0;
public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = "periodicKeepAliveMilliSeconds";
public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0;
// Retention/Truncation Settings
public static final String BKDL_RETENTION_PERIOD_IN_HOURS = "logSegmentRetentionHours";
public static final String BKDL_RETENTION_PERIOD_IN_HOURS_OLD = "retention-size";
public static final int BKDL_RETENTION_PERIOD_IN_HOURS_DEFAULT = 72;
public static final String BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION = "explicitTruncationByApp";
public static final boolean BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION_DEFAULT = false;
// Log Segment Rolling Settings
public static final String BKDL_ROLLING_INTERVAL_IN_MINUTES = "logSegmentRollingMinutes";
public static final String BKDL_ROLLING_INTERVAL_IN_MINUTES_OLD = "rolling-interval";
public static final int BKDL_ROLLING_INTERVAL_IN_MINUTES_DEFAULT = 120;
public static final String BKDL_MAX_LOGSEGMENT_BYTES = "maxLogSegmentBytes";
public static final int BKDL_MAX_LOGSEGMENT_BYTES_DEFAULT = 256 * 1024 * 1024; // default 256MB
public static final String BKDL_LOGSEGMENT_ROLLING_CONCURRENCY = "logSegmentRollingConcurrency";
public static final int BKDL_LOGSEGMENT_ROLLING_CONCURRENCY_DEFAULT = 1;
// Lock Settings
public static final String BKDL_WRITE_LOCK_ENABLED = "writeLockEnabled";
public static final boolean BKDL_WRITE_LOCK_ENABLED_DEFAULT = true;
public static final String BKDL_LOCK_TIMEOUT = "lockTimeoutSeconds";
public static final long BKDL_LOCK_TIMEOUT_DEFAULT = 30;
public static final String BKDL_LOCK_REACQUIRE_TIMEOUT = "lockReacquireTimeoutSeconds";
public static final long BKDL_LOCK_REACQUIRE_TIMEOUT_DEFAULT = DistributedLogConstants.LOCK_REACQUIRE_TIMEOUT_DEFAULT;
public static final String BKDL_LOCK_OP_TIMEOUT = "lockOpTimeoutSeconds";
public static final long BKDL_LOCK_OP_TIMEOUT_DEFAULT = DistributedLogConstants.LOCK_OP_TIMEOUT_DEFAULT;
// Ledger Allocator Settings
public static final String BKDL_ENABLE_LEDGER_ALLOCATOR_POOL = "enableLedgerAllocatorPool";
public static final boolean BKDL_ENABLE_LEDGER_ALLOCATOR_POOL_DEFAULT = false;
public static final String BKDL_LEDGER_ALLOCATOR_POOL_PATH = "ledgerAllocatorPoolPath";
public static final String BKDL_LEDGER_ALLOCATOR_POOL_PATH_DEFAULT = DistributedLogConstants.ALLOCATION_POOL_NODE;
public static final String BKDL_LEDGER_ALLOCATOR_POOL_NAME = "ledgerAllocatorPoolName";
public static final String BKDL_LEDGER_ALLOCATOR_POOL_NAME_DEFAULT = null;
public static final String BKDL_LEDGER_ALLOCATOR_POOL_CORE_SIZE = "ledgerAllocatorPoolCoreSize";
public static final int BKDL_LEDGER_ALLOCATOR_POOL_CORE_SIZE_DEFAULT = 20;
// Write Limit Settings
public static final String BKDL_PER_WRITER_OUTSTANDING_WRITE_LIMIT = "perWriterOutstandingWriteLimit";
public static final int BKDL_PER_WRITER_OUTSTANDING_WRITE_LIMIT_DEFAULT = -1;
public static final String BKDL_GLOBAL_OUTSTANDING_WRITE_LIMIT = "globalOutstandingWriteLimit";
public static final int BKDL_GLOBAL_OUTSTANDING_WRITE_LIMIT_DEFAULT = -1;
public static final String BKDL_OUTSTANDING_WRITE_LIMIT_DARKMODE = "outstandingWriteLimitDarkmode";
public static final boolean BKDL_OUTSTANDING_WRITE_LIMIT_DARKMODE_DEFAULT = true;
//
// DL Reader Settings
//
// General Settings
public static final String BKDL_READLAC_OPTION = "readLACLongPoll";
public static final int BKDL_READLAC_OPTION_DEFAULT = 3; //BKLogPartitionReadHandler.ReadLACOption.READENTRYPIGGYBACK_SEQUENTIAL.value
public static final String BKDL_READLACLONGPOLL_TIMEOUT = "readLACLongPollTimeout";
public static final int BKDL_READLACLONGPOLL_TIMEOUT_DEFAULT = 1000;
public static final String BKDL_DESERIALIZE_RECORDSET_ON_READS = "deserializeRecordSetOnReads";
public static final boolean BKDL_DESERIALIZE_RECORDSET_ON_READS_DEFAULT = true;
// Idle reader settings
public static final String BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS = "readerIdleWarnThresholdMillis";
public static final int BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS_DEFAULT = 120000;
public static final String BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS = "readerIdleErrorThresholdMillis";
public static final int BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS_DEFAULT = Integer.MAX_VALUE;
// Reader constraint settings
public static final String BKDL_READER_IGNORE_TRUNCATION_STATUS = "ignoreTruncationStatus";
public static final boolean BKDL_READER_IGNORE_TRUNCATION_STATUS_DEFAULT = false;
public static final String BKDL_READER_ALERT_POSITION_ON_TRUNCATED = "alertPositionOnTruncated";
public static final boolean BKDL_READER_ALERT_POSITION_ON_TRUNCATED_DEFAULT = true;
public static final String BKDL_READER_POSITION_GAP_DETECTION_ENABLED = "positionGapDetectionEnabled";
public static final boolean BKDL_READER_POSITION_GAP_DETECTION_ENABLED_DEFAULT = false;
// Read ahead related parameters
public static final String BKDL_ENABLE_READAHEAD = "enableReadAhead";
public static final boolean BKDL_ENABLE_READAHEAD_DEFAULT = true;
public static final String BKDL_ENABLE_FORCEREAD = "enableForceRead";
public static final boolean BKDL_ENABLE_FORCEREAD_DEFAULT = true;
public static final String BKDL_READAHEAD_MAX_RECORDS = "readAheadMaxRecords";
public static final String BKDL_READAHEAD_MAX_RECORDS_OLD = "ReadAheadMaxEntries";
public static final int BKDL_READAHEAD_MAX_RECORDS_DEFAULT = 10;
public static final String BKDL_READAHEAD_BATCHSIZE = "readAheadBatchSize";
public static final String BKDL_READAHEAD_BATCHSIZE_OLD = "ReadAheadBatchSize";
public static final int BKDL_READAHEAD_BATCHSIZE_DEFAULT = 2;
public static final String BKDL_READAHEAD_WAITTIME = "readAheadWaitTime";
public static final String BKDL_READAHEAD_WAITTIME_OLD = "ReadAheadWaitTime";
public static final int BKDL_READAHEAD_WAITTIME_DEFAULT = 200;
public static final String BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM = "readAheadWaitTimeOnEndOfStream";
public static final String BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM_OLD = "ReadAheadWaitTimeOnEndOfStream";
public static final int BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM_DEFAULT = 10000;
public static final String BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS =
"readAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis";
public static final int BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS_DEFAULT = 10000;
public static final String BKDL_READAHEAD_SKIP_BROKEN_ENTRIES = "readAheadSkipBrokenEntries";
public static final boolean BKDL_READAHEAD_SKIP_BROKEN_ENTRIES_DEFAULT = false;
public static final String BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT = "numPrefetchEntriesPerLogSegment";
public static final int BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 4;
public static final String BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT = "maxPrefetchEntriesPerLogSegment";
public static final int BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 32;
// Scan Settings
public static final String BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN = "firstNumEntriesEachPerLastRecordScan";
public static final int BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN_DEFAULT = 2;
public static final String BKDL_MAX_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN = "maxNumEntriesPerReadLastRecordScan";
public static final int BKDL_MAX_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN_DEFAULT = 16;
// Log Existence Settings
public static final String BKDL_CHECK_LOG_EXISTENCE_BACKOFF_START_MS = "checkLogExistenceBackoffStartMillis";
public static final int BKDL_CHECK_LOG_EXISTENCE_BACKOFF_START_MS_DEFAULT = 200;
public static final String BKDL_CHECK_LOG_EXISTENCE_BACKOFF_MAX_MS = "checkLogExistenceBackoffMaxMillis";
public static final int BKDL_CHECK_LOG_EXISTENCE_BACKOFF_MAX_MS_DEFAULT = 1000;
//
// Tracing/Stats Settings
//
public static final String BKDL_TRACE_READAHEAD_DELIVERY_LATENCY = "traceReadAheadDeliveryLatency";
public static final boolean BKDL_TRACE_READAHEAD_DELIVERY_LATENCY_DEFAULT = false;
public static final String BKDL_METADATA_LATENCY_WARN_THRESHOLD_MS = "metadataLatencyWarnThresholdMs";
public static final long BKDL_METADATA_LATENCY_WARN_THRESHOLD_MS_DEFAULT = DistributedLogConstants.LATENCY_WARN_THRESHOLD_IN_MILLIS;
public static final String BKDL_DATA_LATENCY_WARN_THRESHOLD_MS = "dataLatencyWarnThresholdMs";
public static final long BKDL_DATA_LATENCY_WARN_THRESHOLD_MS_DEFAULT = 2 * DistributedLogConstants.LATENCY_WARN_THRESHOLD_IN_MILLIS;
public static final String BKDL_TRACE_READAHEAD_METADATA_CHANGES = "traceReadAheadMetadataChanges";
public static final boolean BKDL_TRACE_READAHEAD_MEATDATA_CHANGES_DEFAULT = false;
public final static String BKDL_ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
public final static boolean BKDL_ENABLE_TASK_EXECUTION_STATS_DEFAULT = false;
public final static String BKDL_TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros";
public final static long BKDL_TASK_EXECUTION_WARN_TIME_MICROS_DEFAULT = 100000;
public static final String BKDL_ENABLE_PERSTREAM_STAT = "enablePerStreamStat";
public static final boolean BKDL_ENABLE_PERSTREAM_STAT_DEFAULT = false;
//
// Settings for Feature Providers
//
public static final String BKDL_FEATURE_PROVIDER_CLASS = "featureProviderClass";
//
// Settings for Configuration Based Feature Provider
//
public static final String BKDL_FILE_FEATURE_PROVIDER_BASE_CONFIG_PATH = "fileFeatureProviderBaseConfigPath";
public static final String BKDL_FILE_FEATURE_PROVIDER_BASE_CONFIG_PATH_DEFAULT = "decider.conf";
public static final String BKDL_FILE_FEATURE_PROVIDER_OVERLAY_CONFIG_PATH = "fileFeatureProviderOverlayConfigPath";
public static final String BKDL_FILE_FEATURE_PROVIDER_OVERLAY_CONFIG_PATH_DEFAULT = null;
//
// Settings for Namespaces
//
public static final String BKDL_FEDERATED_NAMESPACE_ENABLED = "federatedNamespaceEnabled";
public static final boolean BKDL_FEDERATED_NAMESPACE_ENABLED_DEFAULT = false;
public static final String BKDL_FEDERATED_MAX_LOGS_PER_SUBNAMESPACE = "federatedMaxLogsPerSubnamespace";
public static final int BKDL_FEDERATED_MAX_LOGS_PER_SUBNAMESPACE_DEFAULT = 15000;
public static final String BKDL_FEDERATED_CHECK_EXISTENCE_WHEN_CACHE_MISS = "federatedCheckExistenceWhenCacheMiss";
public static final boolean BKDL_FEDERATED_CHECK_EXISTENCE_WHEN_CACHE_MISS_DEFAULT = true;
// Settings for Configurations
public static final String BKDL_DYNAMIC_CONFIG_RELOAD_INTERVAL_SEC = "dynamicConfigReloadIntervalSec";
public static final int BKDL_DYNAMIC_CONFIG_RELOAD_INTERVAL_SEC_DEFAULT = 60;
public static final String BKDL_STREAM_CONFIG_ROUTER_CLASS = "streamConfigRouterClass";
public static final String BKDL_STREAM_CONFIG_ROUTER_CLASS_DEFAULT = "com.twitter.distributedlog.service.config.IdentityConfigRouter";
// Settings for RateLimit (used by distributedlog-service)
public static final String BKDL_BPS_SOFT_WRITE_LIMIT = "bpsSoftWriteLimit";
public static final int BKDL_BPS_SOFT_WRITE_LIMIT_DEFAULT = -1;
public static final String BKDL_BPS_HARD_WRITE_LIMIT = "bpsHardWriteLimit";
public static final int BKDL_BPS_HARD_WRITE_LIMIT_DEFAULT = -1;
public static final String BKDL_RPS_SOFT_WRITE_LIMIT = "rpsSoftWriteLimit";
public static final int BKDL_RPS_SOFT_WRITE_LIMIT_DEFAULT = -1;
public static final String BKDL_RPS_HARD_WRITE_LIMIT = "rpsHardWriteLimit";
public static final int BKDL_RPS_HARD_WRITE_LIMIT_DEFAULT = -1;
// Rate and resource limits: per shard
public static final String BKDL_RPS_SOFT_SERVICE_LIMIT = "rpsSoftServiceLimit";
public static final int BKDL_RPS_SOFT_SERVICE_LIMIT_DEFAULT = -1;
public static final String BKDL_RPS_HARD_SERVICE_LIMIT = "rpsHardServiceLimit";
public static final int BKDL_RPS_HARD_SERVICE_LIMIT_DEFAULT = -1;
public static final String BKDL_RPS_STREAM_ACQUIRE_SERVICE_LIMIT = "rpsStreamAcquireServiceLimit";
public static final int BKDL_RPS_STREAM_ACQUIRE_SERVICE_LIMIT_DEFAULT = -1;
public static final String BKDL_BPS_SOFT_SERVICE_LIMIT = "bpsSoftServiceLimit";
public static final int BKDL_BPS_SOFT_SERVICE_LIMIT_DEFAULT = -1;
public static final String BKDL_BPS_HARD_SERVICE_LIMIT = "bpsHardServiceLimit";
public static final int BKDL_BPS_HARD_SERVICE_LIMIT_DEFAULT = -1;
public static final String BKDL_BPS_STREAM_ACQUIRE_SERVICE_LIMIT = "bpsStreamAcquireServiceLimit";
public static final int BKDL_BPS_STREAM_ACQUIRE_SERVICE_LIMIT_DEFAULT = -1;
// Settings for Partitioning
public static final String BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY = "maxAcquiredPartitionsPerProxy";
public static final int BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY_DEFAULT = -1;
public static final String BKDL_MAX_CACHED_PARTITIONS_PER_PROXY = "maxCachedPartitionsPerProxy";
public static final int BKDL_MAX_CACHED_PARTITIONS_PER_PROXY_DEFAULT = -1;
//
// Settings for Error Injection
//
public static final String BKDL_EI_INJECT_WRITE_DELAY = "eiInjectWriteDelay";
public static final boolean BKDL_EI_INJECT_WRITE_DELAY_DEFAULT = false;
public static final String BKDL_EI_INJECTED_WRITE_DELAY_PERCENT = "eiInjectedWriteDelayPercent";
public static final double BKDL_EI_INJECTED_WRITE_DELAY_PERCENT_DEFAULT = 0.0;
public static final String BKDL_EI_INJECTED_WRITE_DELAY_MS = "eiInjectedWriteDelayMs";
public static final int BKDL_EI_INJECTED_WRITE_DELAY_MS_DEFAULT = 0;
public static final String BKDL_EI_INJECT_READAHEAD_STALL = "eiInjectReadAheadStall";
public static final boolean BKDL_EI_INJECT_READAHEAD_STALL_DEFAULT = false;
public static final String BKDL_EI_INJECT_READAHEAD_DELAY = "eiInjectReadAheadDelay";
public static final boolean BKDL_EI_INJECT_READAHEAD_DELAY_DEFAULT = false;
public static final String BKDL_EI_INJECT_MAX_READAHEAD_DELAY_MS = "eiInjectMaxReadAheadDelayMs";
public static final int BKDL_EI_INJECT_MAX_READAHEAD_DELAY_MS_DEFAULT = 0;
public static final String BKDL_EI_INJECT_READAHEAD_DELAY_PERCENT = "eiInjectReadAheadDelayPercent";
public static final int BKDL_EI_INJECT_READAHEAD_DELAY_PERCENT_DEFAULT = 10;
public static final String BKDL_EI_INJECT_READAHEAD_BROKEN_ENTRIES = "eiInjectReadAheadBrokenEntries";
public static final boolean BKDL_EI_INJECT_READAHEAD_BROKEN_ENTRIES_DEFAULT = false;
// Whitelisted stream-level configuration settings.
private static final Set<String> streamSettings = Sets.newHashSet(
BKDL_READER_POSITION_GAP_DETECTION_ENABLED,
BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS,
BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS,
BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS,
BKDL_ENABLE_IMMEDIATE_FLUSH
);
/**
* Construct distributedlog configuration with default settings.
* It also loads the settings from system properties.
*/
public DistributedLogConfiguration() {
super();
// add configuration for system properties
addConfiguration(new SystemConfiguration());
}
/**
* You can load configurations in precedence order. The first one takes
* precedence over any loaded later.
*
* @param confURL Configuration URL
*/
public void loadConf(URL confURL) throws ConfigurationException {
Configuration loadedConf = new PropertiesConfiguration(confURL);
addConfiguration(loadedConf);
}
/**
* You can load configuration from other configuration
*
* @param baseConf Other Configuration
*/
public void loadConf(DistributedLogConfiguration baseConf) {
addConfiguration(baseConf);
}
/**
* Load configuration from other configuration object
*
* @param otherConf Other configuration object
*/
public void loadConf(Configuration otherConf) {
addConfiguration(otherConf);
}
/**
* Load whitelisted stream configuration from another configuration object
*
* @param streamConfiguration stream configuration overrides
*/
public void loadStreamConf(Optional<DistributedLogConfiguration> streamConfiguration) {
if (!streamConfiguration.isPresent()) {
return;
}
ArrayList<Object> ignoredSettings = new ArrayList<Object>();
Iterator iterator = streamConfiguration.get().getKeys();
while (iterator.hasNext()) {
Object setting = iterator.next();
if (setting instanceof String && streamSettings.contains(setting)) {
String settingStr = (String) setting;
setProperty(settingStr, streamConfiguration.get().getProperty(settingStr));
} else {
ignoredSettings.add(setting);
}
}
if (LOG.isWarnEnabled() && !ignoredSettings.isEmpty()) {
LOG.warn("invalid stream configuration override(s): {}",
StringUtils.join(ignoredSettings, ";"));
}
}
//
// ZooKeeper Related Settings
//
/**
* Get all properties as a string.
*/
public String getPropsAsString() {
Iterator iterator = getKeys();
StringBuilder builder = new StringBuilder();
boolean appendNewline = false;
while (iterator.hasNext()) {
Object key = iterator.next();
if (key instanceof String) {
if (appendNewline) {
builder.append("\n");
}
Object value = getProperty((String)key);
builder.append(key).append("=").append(value);
appendNewline = true;
}
}
return builder.toString();
}
/**
* Get digest id used for ZK acl.
*
* @return zk acl id.
*/
public String getZkAclId() {
return getString(BKDL_ZK_ACL_ID, BKDL_ZK_ACL_ID_DEFAULT);
}
/**
* Set digest id to use for ZK acl.
*
* @param zkAclId acl id.
* @return distributedlog configuration
* @see #getZkAclId()
*/
public DistributedLogConfiguration setZkAclId(String zkAclId) {
setProperty(BKDL_ZK_ACL_ID, zkAclId);
return this;
}
/**
* Get ZK Session timeout in seconds.
* <p>
* This is the session timeout applied for zookeeper client used by distributedlog.
* Use {@link #getBKClientZKSessionTimeoutMilliSeconds()} for zookeeper client used
* by bookkeeper client.
*
* @return zookeeeper session timeout in seconds.
* @deprecated use {@link #getZKSessionTimeoutMilliseconds()}
*/
public int getZKSessionTimeoutSeconds() {
return this.getInt(BKDL_ZK_SESSION_TIMEOUT_SECONDS, BKDL_ZK_SESSION_TIMEOUT_SECONDS_DEFAULT);
}
/**
* Get ZK Session timeout in milliseconds.
* <p>
* This is the session timeout applied for zookeeper client used by distributedlog.
* Use {@link #getBKClientZKSessionTimeoutMilliSeconds()} for zookeeper client used
* by bookkeeper client.
*
* @return zk session timeout in milliseconds.
*/
public int getZKSessionTimeoutMilliseconds() {
return this.getInt(BKDL_ZK_SESSION_TIMEOUT_SECONDS, BKDL_ZK_SESSION_TIMEOUT_SECONDS_DEFAULT) * 1000;
}
/**
* Set ZK Session Timeout in seconds.
*
* @param zkSessionTimeoutSeconds session timeout in seconds.
* @return distributed log configuration
* @see #getZKSessionTimeoutMilliseconds()
*/
public DistributedLogConfiguration setZKSessionTimeoutSeconds(int zkSessionTimeoutSeconds) {
setProperty(BKDL_ZK_SESSION_TIMEOUT_SECONDS, zkSessionTimeoutSeconds);
return this;
}
/**
* Get zookeeper access rate limit.
* <p>The rate limiter is basically a guava {@link com.google.common.util.concurrent.RateLimiter}.
* It is rate limiting the requests that sent by zookeeper client. If the value is non-positive,
* the rate limiting is disable. By default it is disable (value = 0).
*
* @return zookeeper access rate, by default it is 0.
*/
public double getZKRequestRateLimit() {
return this.getDouble(BKDL_ZK_REQUEST_RATE_LIMIT, BKDL_ZK_REQUEST_RATE_LIMIT_DEFAULT);
}
/**
* Set zookeeper access rate limit (rps).
*
* @param requestRateLimit
* zookeeper access rate limit
* @return distributedlog configuration
* @see #getZKRequestRateLimit()
*/
public DistributedLogConfiguration setZKRequestRateLimit(double requestRateLimit) {
setProperty(BKDL_ZK_REQUEST_RATE_LIMIT, requestRateLimit);
return this;
}
/**
* Get num of retries per request for zookeeper client.
* <p>Retries only happen on retryable failures like session expired,
* session moved. for permanent failures, the request will fail immediately.
* The default value is 3.
*
* @return num of retries per request of zookeeper client.
*/
public int getZKNumRetries() {
return this.getInt(BKDL_ZK_NUM_RETRIES, BKDL_ZK_NUM_RETRIES_DEFAULT);
}
/**
* Set num of retries per request for zookeeper client.
*
* @param zkNumRetries num of retries per request of zookeeper client.
* @return distributed log configuration
* @see #getZKNumRetries()
*/
public DistributedLogConfiguration setZKNumRetries(int zkNumRetries) {
setProperty(BKDL_ZK_NUM_RETRIES, zkNumRetries);
return this;
}
/**
* Get the start backoff time of zookeeper operation retries, in milliseconds.
* <p>The retry time will increase in bound exponential way, and become flat
* after hit max backoff time ({@link #getZKRetryBackoffMaxMillis()}).
* The default start backoff time is 5000 milliseconds.
*
* @return start backoff time of zookeeper operation retries, in milliseconds.
* @see #getZKRetryBackoffMaxMillis()
*/
public int getZKRetryBackoffStartMillis() {
return this.getInt(BKDL_ZK_RETRY_BACKOFF_START_MILLIS,
BKDL_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT);
}
/**
* Set the start backoff time of zookeeper operation retries, in milliseconds.
*
* @param zkRetryBackoffStartMillis start backoff time of zookeeper operation retries,
* in milliseconds.
* @return distributed log configuration
* @see #getZKRetryBackoffStartMillis()
*/
public DistributedLogConfiguration setZKRetryBackoffStartMillis(int zkRetryBackoffStartMillis) {
setProperty(BKDL_ZK_RETRY_BACKOFF_START_MILLIS, zkRetryBackoffStartMillis);
return this;
}
/**
* Get the max backoff time of zookeeper operation retries, in milliseconds.
* <p>The retry time will increase in bound exponential way starting from
* {@link #getZKRetryBackoffStartMillis()}, and become flat after hit this max
* backoff time.
* The default max backoff time is 30000 milliseconds.
*
* @return max backoff time of zookeeper operation retries, in milliseconds.
* @see #getZKRetryBackoffStartMillis()
*/
public int getZKRetryBackoffMaxMillis() {
return this.getInt(BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS,
BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT);
}
/**
* Set the max backoff time of zookeeper operation retries, in milliseconds.
*
* @param zkRetryBackoffMaxMillis max backoff time of zookeeper operation retries,
* in milliseconds.
* @return distributed log configuration
* @see #getZKRetryBackoffMaxMillis()
*/
public DistributedLogConfiguration setZKRetryBackoffMaxMillis(int zkRetryBackoffMaxMillis) {
setProperty(BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS, zkRetryBackoffMaxMillis);
return this;
}
/**
* Get ZK client number of retry executor threads.
* By default it is 1.
*
* @return number of bookkeeper client worker threads.
*/
public int getZKClientNumberRetryThreads() {
return this.getInt(BKDL_ZKCLIENT_NUM_RETRY_THREADS, BKDL_ZKCLIENT_NUM_RETRY_THREADS_DEFAULT);
}
/**
* Set ZK client number of retry executor threads.
*
* @param numThreads
* number of retry executor threads.
* @return distributedlog configuration.
* @see #getZKClientNumberRetryThreads()
*/
public DistributedLogConfiguration setZKClientNumberRetryThreads(int numThreads) {
setProperty(BKDL_ZKCLIENT_NUM_RETRY_THREADS, numThreads);
return this;
}
//
// BookKeeper ZooKeeper Client Settings
//
/**
* Get BK's zookeeper session timout in milliseconds.
* <p>
* This is the session timeout applied for zookeeper client used by bookkeeper client.
* Use {@link #getZKSessionTimeoutMilliseconds()} for zookeeper client used
* by distributedlog.
*
* @return Bk's zookeeper session timeout in milliseconds
*/
public int getBKClientZKSessionTimeoutMilliSeconds() {
return this.getInt(BKDL_BKCLIENT_ZK_SESSION_TIMEOUT, BKDL_BKCLIENT_ZK_SESSION_TIMEOUT_DEFAULT) * 1000;
}
/**
* Set BK's zookeeper session timeout in seconds.
*
* @param sessionTimeout session timeout for the ZK Client used by BK Client, in seconds.
* @return distributed log configuration
* @see #getBKClientZKSessionTimeoutMilliSeconds()
*/
public DistributedLogConfiguration setBKClientZKSessionTimeout(int sessionTimeout) {
setProperty(BKDL_BKCLIENT_ZK_SESSION_TIMEOUT, sessionTimeout);
return this;
}
/**
* Get zookeeper access rate limit for zookeeper client used in bookkeeper client.
* <p>The rate limiter is basically a guava {@link com.google.common.util.concurrent.RateLimiter}.
* It is rate limiting the requests that sent by zookeeper client. If the value is non-positive,
* the rate limiting is disable. By default it is disable (value = 0).
*
* @return zookeeper access rate limit for zookeeper client used in bookkeeper client.
* By default it is 0.
*/
public double getBKClientZKRequestRateLimit() {
return this.getDouble(BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT,
BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT_DEFAULT);
}
/**
* Set zookeeper access rate limit for zookeeper client used in bookkeeper client.
*
* @param rateLimit
* zookeeper access rate limit
* @return distributedlog configuration.
* @see #getBKClientZKRequestRateLimit()
*/
public DistributedLogConfiguration setBKClientZKRequestRateLimit(double rateLimit) {
setProperty(BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT, rateLimit);
return this;
}
/**
* Get num of retries for zookeeper client that used by bookkeeper client.
* <p>Retries only happen on retryable failures like session expired,
* session moved. for permanent failures, the request will fail immediately.
* The default value is 3. Setting it to zero or negative will retry infinitely.
*
* @return num of retries of zookeeper client used by bookkeeper client.
*/
public int getBKClientZKNumRetries() {
int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
if (zkNumRetries <= 0) {
return Integer.MAX_VALUE;
}
return zkNumRetries;
}
/**
* Get the start backoff time of zookeeper operation retries, in milliseconds.
* <p>The retry time will increase in bound exponential way, and become flat
* after hit max backoff time ({@link #getBKClientZKRetryBackoffMaxMillis()}.
* The default start backoff time is 5000 milliseconds.
*
* @return start backoff time of zookeeper operation retries, in milliseconds.
* @see #getBKClientZKRetryBackoffMaxMillis()
*/
public int getBKClientZKRetryBackoffStartMillis() {
return this.getInt(BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS,
BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT);
}
/**
* Get the max backoff time of zookeeper operation retries, in milliseconds.
* <p>The retry time will increase in bound exponential way starting from
* {@link #getBKClientZKRetryBackoffStartMillis()}, and become flat after
* hit this max backoff time.
* The default max backoff time is 30000 milliseconds.
*
* @return max backoff time of zookeeper operation retries, in milliseconds.
* @see #getBKClientZKRetryBackoffStartMillis()
*/
public int getBKClientZKRetryBackoffMaxMillis() {
return this.getInt(BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS,
BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT);
}
//
// BookKeeper Ensemble Placement Settings
//
/**
* Get ensemble size of each log segment (ledger) will use.
* By default it is 3.
* <p>
* A log segment's data is stored in an ensemble of bookies in
* a stripping way. Each entry will be added in a <code>write-quorum</code>
* size of bookies. The add operation will complete once it receives
* responses from a <code>ack-quorum</code> size of bookies. The stripping
* is done in a round-robin way in bookkeeper.
* <p>
* For example, we configure the ensemble-size to 5, write-quorum-size to 3,
* and ack-quorum-size to 2. The data will be stored in following stripping way.
* <pre>
* | entry id | bk1 | bk2 | bk3 | bk4 | bk5 |
* | 0 | x | x | x | | |
* | 1 | | x | x | x | |
* | 2 | | | x | x | x |
* | 3 | x | | | x | x |
* | 4 | x | x | | | x |
* | 5 | x | x | x | | |
* </pre>
* <p>
* We don't recommend stripping within a log segment to increase bandwidth.
* We'd recommend to strip by `partition` in higher level of distributedlog
* to increase performance. so typically the ensemble size will set to be
* the same value as write quorum size.
*
* @return ensemble size
* @see #getWriteQuorumSize()
* @see #getAckQuorumSize()
*/
public int getEnsembleSize() {
return this.getInt(BKDL_BOOKKEEPER_ENSEMBLE_SIZE,
getInt(BKDL_BOOKKEEPER_ENSEMBLE_SIZE_OLD,
BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT));
}
/**
* Set ensemble size of each log segment (ledger) will use.
*
* @param ensembleSize ensemble size.
* @return distributed log configuration
* @see #getEnsembleSize()
*/
public DistributedLogConfiguration setEnsembleSize(int ensembleSize) {
setProperty(BKDL_BOOKKEEPER_ENSEMBLE_SIZE, ensembleSize);
return this;
}
/**
* Get write quorum size of each log segment (ledger) will use.
* By default it is 3.
*
* @return write quorum size
* @see #getEnsembleSize()
*/
public int getWriteQuorumSize() {
return this.getInt(BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE,
getInt(BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_OLD,
BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_DEFAULT));
}
/**
* Set write quorum size of each log segment (ledger) will use.
*
* @param quorumSize
* quorum size.
* @return distributedlog configuration.
* @see #getWriteQuorumSize()
*/
public DistributedLogConfiguration setWriteQuorumSize(int quorumSize) {
setProperty(BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE, quorumSize);
return this;
}
/**
* Get ack quorum size of each log segment (ledger) will use.
* By default it is 2.
*
* @return ack quorum size
* @see #getEnsembleSize()
*/
public int getAckQuorumSize() {
return this.getInt(BKDL_BOOKKEEPER_ACK_QUORUM_SIZE,
getInt(BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_OLD,
BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_DEFAULT));
}
/**
* Set ack quorum size of each log segment (ledger) will use.
*
* @param quorumSize
* quorum size.
* @return distributedlog configuration.
* @see #getAckQuorumSize()
*/
public DistributedLogConfiguration setAckQuorumSize(int quorumSize) {
setProperty(BKDL_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize);
return this;
}
/**
* Get the quorum config for each log segment (ledger).
*
* @return quorum config that used by log segments
* @see #getEnsembleSize()
* @see #getWriteQuorumSize()
* @see #getAckQuorumSize()
*/
public QuorumConfig getQuorumConfig() {
return new QuorumConfig(
getEnsembleSize(),
getWriteQuorumSize(),
getAckQuorumSize());
}
/**
* Get if row aware ensemble placement is enabled.
* <p>If enabled, {@link DNSResolverForRows} will be used for dns resolution
* rather than {@link DNSResolverForRacks}, if no other dns resolver set via
* {@link #setEnsemblePlacementDnsResolverClass(Class)}.
* By default it is disable.
*
* @return true if row aware ensemble placement is enabled, otherwise false.
* @see #getEnsemblePlacementDnsResolverClass()
*/
public boolean getRowAwareEnsemblePlacementEnabled() {
return getBoolean(BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT,
getBoolean(BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_OLD,
BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_DEFAULT));
}
/**
* Set if we should enable row aware ensemble placement.
*
* @param enableRowAwareEnsemblePlacement
* enableRowAwareEnsemblePlacement
* @return distributedlog configuration.
* @see #getRowAwareEnsemblePlacementEnabled()
*/
public DistributedLogConfiguration setRowAwareEnsemblePlacementEnabled(boolean enableRowAwareEnsemblePlacement) {
setProperty(BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT, enableRowAwareEnsemblePlacement);
return this;
}
/**
* Get the DNS resolver class for bookkeeper ensemble placement.
* <p>By default, {@link DNSResolverForRacks} will be used if
* {@link #getRowAwareEnsemblePlacementEnabled()} is disabled and
* {@link DNSResolverForRows} will be used if {@link #getRowAwareEnsemblePlacementEnabled()}
* is enabled.
*
* @return dns resolver class for bookkeeper ensemble placement.
* @throws ConfigurationException
* @see #getRowAwareEnsemblePlacementEnabled()
*/
public Class<? extends DNSToSwitchMapping> getEnsemblePlacementDnsResolverClass()
throws ConfigurationException {
Class<? extends DNSToSwitchMapping> defaultResolverCls;
if (getRowAwareEnsemblePlacementEnabled()) {
defaultResolverCls = DNSResolverForRows.class;
} else {
defaultResolverCls = DNSResolverForRacks.class;
}
return ReflectionUtils.getClass(this, BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS,
defaultResolverCls, DNSToSwitchMapping.class, defaultLoader);
}
/**
* Set the DNS resolver class for bookkeeper ensemble placement.
*
* @param dnsResolverClass
* dns resolver class for bookkeeper ensemble placement.
* @return distributedlog configuration
* @see #getEnsemblePlacementDnsResolverClass()
*/
public DistributedLogConfiguration setEnsemblePlacementDnsResolverClass(
Class<? extends DNSToSwitchMapping> dnsResolverClass) {
setProperty(BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS, dnsResolverClass.getName());
return this;
}
/**
* Get mapping used to override the region mapping derived by the default resolver.
* <p>It is a string of pairs of host-region mappings (host:region) separated by semicolon.
* By default it is empty string.
*
* @return dns resolver overrides.
* @see #getEnsemblePlacementDnsResolverClass()
* @see DNSResolverForRacks
* @see DNSResolverForRows
*/
public String getBkDNSResolverOverrides() {
return getString(BKDL_BK_DNS_RESOLVER_OVERRIDES, BKDL_BK_DNS_RESOLVER_OVERRIDES_DEFAULT);
}
/**
* Set mapping used to override the region mapping derived by the default resolver
* <p>It is a string of pairs of host-region mappings (host:region) separated by semicolon.
* By default it is empty string.
*
* @param overrides
* dns resolver overrides
* @return dl configuration.
* @see #getBkDNSResolverOverrides()
*/
public DistributedLogConfiguration setBkDNSResolverOverrides(String overrides) {
setProperty(BKDL_BK_DNS_RESOLVER_OVERRIDES, overrides);
return this;
}
//
// BookKeeper General Settings
//
/**
* Set password used by bookkeeper client for digestion.
* <p>
* NOTE: not recommend to change. will be derepcated in future.
*
* @param bkDigestPW BK password digest
* @return distributedlog configuration
*/
public DistributedLogConfiguration setBKDigestPW(String bkDigestPW) {
setProperty(BKDL_BOOKKEEPER_DIGEST_PW, bkDigestPW);
return this;
}
/**
* Get password used by bookkeeper client for digestion.
* <p>
* NOTE: not recommend to change. will be deprecated in future.
*
* @return password used by bookkeeper client for digestion
* @see #setBKDigestPW(String)
*/
public String getBKDigestPW() {
return getString(BKDL_BOOKKEEPER_DIGEST_PW, BKDL_BOOKKEEPER_DIGEST_PW_DEFAULT);
}
/**
* Get BK client number of i/o threads used by Netty.
* The default value equals DL's number worker threads.
*
* @return number of bookkeeper netty i/o threads.
* @see #getNumWorkerThreads()
*/
public int getBKClientNumberIOThreads() {
return this.getInt(BKDL_BKCLIENT_NUM_IO_THREADS, getNumWorkerThreads());
}
/**
* Set BK client number of i/o threads used by netty.
*
* @param numThreads
* number io threads.
* @return distributedlog configuration.
* @see #getBKClientNumberIOThreads()
*/
public DistributedLogConfiguration setBKClientNumberIOThreads(int numThreads) {
setProperty(BKDL_BKCLIENT_NUM_IO_THREADS, numThreads);
return this;
}
/**
* Get the tick duration in milliseconds that used for timeout timer in bookkeeper client.
* By default it is 100.
*
* @return tick duration in milliseconds
* @see org.jboss.netty.util.HashedWheelTimer
*/
public long getTimeoutTimerTickDurationMs() {
return getLong(BKDL_TIMEOUT_TIMER_TICK_DURATION_MS, BKDL_TIMEOUT_TIMER_TICK_DURATION_MS_DEFAULT);
}
/**
* Set the tick duration in milliseconds that used for timeout timer in bookkeeper client.
*
* @param tickDuration
* tick duration in milliseconds.
* @return distributed log configuration.
* @see #getTimeoutTimerTickDurationMs()
*/
public DistributedLogConfiguration setTimeoutTimerTickDurationMs(long tickDuration) {
setProperty(BKDL_TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration);
return this;
}
/**
* Get number of ticks that used for timeout timer in bookkeeper client.
* By default is 1024.
*
* @return number of ticks that used for timeout timer.
* @see org.jboss.netty.util.HashedWheelTimer
*/
public int getTimeoutTimerNumTicks() {
return getInt(BKDL_TIMEOUT_TIMER_NUM_TICKS, BKDL_TIMEOUT_TIMER_NUM_TICKS_DEFAULT);
}
/**
* Set number of ticks that used for timeout timer in bookkeeper client.
*
* @param numTicks
* number of ticks that used for timeout timer.
* @return distributed log configuration.
* @see #getTimeoutTimerNumTicks()
*/
public DistributedLogConfiguration setTimeoutTimerNumTicks(int numTicks) {
setProperty(BKDL_TIMEOUT_TIMER_NUM_TICKS, numTicks);
return this;
}
//
// Deprecated BookKeeper Settings
//
/**
* Get BK client read timeout in seconds.
* <p>
* Please use {@link ClientConfiguration#getReadEntryTimeout()}
* instead of this setting.
*
* @return read timeout in seconds
* @deprecated
* @see ClientConfiguration#getReadEntryTimeout()
*/
public int getBKClientReadTimeout() {
return this.getInt(BKDL_BKCLIENT_READ_TIMEOUT,
BKDL_BKCLIENT_READ_TIMEOUT_DEFAULT);
}
/**
* Set BK client read timeout in seconds.
*
* @param readTimeout read timeout in seconds.
* @return distributed log configuration
* @deprecated
* @see #getBKClientReadTimeout()
*/
public DistributedLogConfiguration setBKClientReadTimeout(int readTimeout) {
setProperty(BKDL_BKCLIENT_READ_TIMEOUT, readTimeout);
return this;
}
/**
* Get BK client write timeout in seconds.
* <p>
* Please use {@link ClientConfiguration#getAddEntryTimeout()}
* instead of this setting.
*
* @return write timeout in seconds.
* @deprecated
* @see ClientConfiguration#getAddEntryTimeout()
*/
public int getBKClientWriteTimeout() {
return this.getInt(BKDL_BKCLIENT_WRITE_TIMEOUT, BKDL_BKCLIENT_WRITE_TIMEOUT_DEFAULT);
}
/**
* Set BK client write timeout in seconds
*
* @param writeTimeout write timeout in seconds.
* @return distributed log configuration
* @deprecated
* @see #getBKClientWriteTimeout()
*/
public DistributedLogConfiguration setBKClientWriteTimeout(int writeTimeout) {
setProperty(BKDL_BKCLIENT_WRITE_TIMEOUT, writeTimeout);
return this;
}
/**
* Get BK client number of worker threads.
* <p>
* Please use {@link ClientConfiguration#getNumWorkerThreads()}
* instead of this setting.
*
* @return number of bookkeeper client worker threads.
* @deprecated
* @see ClientConfiguration#getNumWorkerThreads()
*/
public int getBKClientNumberWorkerThreads() {
return this.getInt(BKDL_BKCLIENT_NUM_WORKER_THREADS, BKDL_BKCLEINT_NUM_WORKER_THREADS_DEFAULT);
}
/**
* Set BK client number of worker threads.
*
* @param numThreads
* number worker threads.
* @return distributedlog configuration.
* @deprecated
* @see #getBKClientNumberWorkerThreads()
*/
public DistributedLogConfiguration setBKClientNumberWorkerThreads(int numThreads) {
setProperty(BKDL_BKCLIENT_NUM_WORKER_THREADS, numThreads);
return this;
}
//
// DL Executor Settings
//
/**
* Get the number of worker threads used by distributedlog namespace.
* By default it is the number of available processors.
*
* @return number of worker threads used by distributedlog namespace.
*/
public int getNumWorkerThreads() {
return getInt(BKDL_NUM_WORKER_THREADS, Runtime.getRuntime().availableProcessors());
}
/**
* Set the number of worker threads used by distributedlog namespace.
*
* @param numWorkerThreads
* number of worker threads used by distributedlog namespace.
* @return configuration
* @see #getNumWorkerThreads()
*/
public DistributedLogConfiguration setNumWorkerThreads(int numWorkerThreads) {
setProperty(BKDL_NUM_WORKER_THREADS, numWorkerThreads);
return this;
}
/**
* Get the number of dedicated readahead worker threads used by distributedlog namespace.
* <p>If this value is non-positive, it would share the normal executor (see {@link #getNumWorkerThreads()}
* for readahead. otherwise, it would use a dedicated executor for readhead. By default,
* it is 0.
*
* @return number of dedicated readahead worker threads.
* @see #getNumWorkerThreads()
*/
@Deprecated
public int getNumReadAheadWorkerThreads() {
return getInt(BKDL_NUM_READAHEAD_WORKER_THREADS, 0);
}
/**
* Set the number of dedicated readahead worker threads used by distributedlog namespace.
*
* @param numWorkerThreads
* number of dedicated readahead worker threads.
* @return configuration
* @see #getNumReadAheadWorkerThreads()
*/
@Deprecated
public DistributedLogConfiguration setNumReadAheadWorkerThreads(int numWorkerThreads) {
setProperty(BKDL_NUM_READAHEAD_WORKER_THREADS, numWorkerThreads);
return this;
}
/**
* Get the number of lock state threads used by distributedlog namespace.
* By default it is 1.
*
* @return number of lock state threads used by distributedlog namespace.
*/
public int getNumLockStateThreads() {
return getInt(BKDL_NUM_LOCKSTATE_THREADS, 1);
}
/**
* Set the number of lock state threads used by distributedlog manager factory.
*
* @param numLockStateThreads
* number of lock state threads used by distributedlog manager factory.
* @return configuration
* @see #getNumLockStateThreads()
*/
public DistributedLogConfiguration setNumLockStateThreads(int numLockStateThreads) {
setProperty(BKDL_NUM_LOCKSTATE_THREADS, numLockStateThreads);
return this;
}
/**
* Get the number of resource release threads used by distributedlog namespace.
* By default it is 0 - the thread will be created dynamically by a executor service.
* The executor service is an unbounded pool. Application can use `total_tasks - completed_tasks`
* on monitoring the number of threads that are used for releasing resources.
* <p>
* The setting is only applied for v2 implementation.
*
* @see com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor
* @return number of resource release threads used by distributedlog namespace.
*/
public int getNumResourceReleaseThreads() {
return getInt(BKDL_NUM_RESOURCE_RELEASE_THREADS, 0);
}
/**
* Set the number of resource release threads used by distributedlog manager factory.
*
* @param numResourceReleaseThreads
* number of resource release threads used by distributedlog manager factory.
* @return configuration
* @see #getNumResourceReleaseThreads()
*/
public DistributedLogConfiguration setNumResourceReleaseThreads(int numResourceReleaseThreads) {
setProperty(BKDL_NUM_RESOURCE_RELEASE_THREADS, numResourceReleaseThreads);
return this;
}
/**
* Get timeout for shutting down schedulers in dl manager, in milliseconds.
* By default, it is 5 seconds.
*
* @return timeout for shutting down schedulers in dl manager, in miliseconds.
*/
public int getSchedulerShutdownTimeoutMs() {
return getInt(BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS, BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS_DEFAULT);
}
/**
* Set timeout for shutting down schedulers in dl manager, in milliseconds.
*
* @param timeoutMs
* timeout for shutting down schedulers in dl manager, in milliseconds.
* @return dl configuration.
* @see #getSchedulerShutdownTimeoutMs()
*/
public DistributedLogConfiguration setSchedulerShutdownTimeoutMs(int timeoutMs) {
setProperty(BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS, timeoutMs);
return this;
}
/**
* Whether to use daemon thread for DL threads.
* By default it is false.
*
* @return true if use daemon threads, otherwise false.
*/
public boolean getUseDaemonThread() {
return getBoolean(BKDL_USE_DAEMON_THREAD, BKDL_USE_DAEMON_THREAD_DEFAULT);
}
/**
* Set whether to use daemon thread for DL threads.
*
* @param daemon
* whether to use daemon thread for DL threads.
* @return distributedlog configuration
* @see #getUseDaemonThread()
*/
public DistributedLogConfiguration setUseDaemonThread(boolean daemon) {
setProperty(BKDL_USE_DAEMON_THREAD, daemon);
return this;
}
//
// Metadata Settings
//
/**
* Get DL ledger metadata output layout version.
*
* @return layout version
* @see com.twitter.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion
*/
public int getDLLedgerMetadataLayoutVersion() {
return this.getInt(BKDL_LEDGER_METADATA_LAYOUT_VERSION,
getInt(BKDL_LEDGER_METADATA_LAYOUT_VERSION_OLD,
BKDL_LEDGER_METADATA_LAYOUT_VERSION_DEFAULT));
}
/**
* Set DL ledger metadata output layout version.
*
* @param layoutVersion layout version
* @return distributed log configuration
* @throws IllegalArgumentException if setting an unknown layout version.
* @see #getDLLedgerMetadataLayoutVersion()
*/
public DistributedLogConfiguration setDLLedgerMetadataLayoutVersion(int layoutVersion)
throws IllegalArgumentException {
if ((layoutVersion <= 0) ||
(layoutVersion > LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION)) {
// Incorrect version specified
throw new IllegalArgumentException("Incorrect value for ledger metadata layout version");
}
setProperty(BKDL_LEDGER_METADATA_LAYOUT_VERSION, layoutVersion);
return this;
}
/**
* Get the setting for whether we should enforce the min ledger metadata version check.
* By default it is false.
*
* @return whether we should enforce the min ledger metadata version check
* @see com.twitter.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion
*/
public boolean getDLLedgerMetadataSkipMinVersionCheck() {
return this.getBoolean(BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK,
BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK_DEFAULT);
}
/**
* Set if we should skip the enforcement of min ledger metadata version.
* <p>NOTE: please be aware the side effects of skipping min ledger metadata
* version checking.
*
* @param skipMinVersionCheck whether we should enforce the min ledger metadata version check
* @return distributed log configuration
* @see #getDLLedgerMetadataSkipMinVersionCheck()
*/
public DistributedLogConfiguration setDLLedgerMetadataSkipMinVersionCheck(boolean skipMinVersionCheck) throws IllegalArgumentException {
setProperty(BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK, skipMinVersionCheck);
return this;
}
/**
* Get the value at which ledger sequence number should start for streams that are being
* upgraded and did not have ledger sequence number to start with or for newly created
* streams. By default, it is 1.
* <p>In most of the cases this value should not be changed. It is useful for backfilling
* in the case of migrating log segments whose metadata don't have log segment sequence number.
*
* @return first ledger sequence number
*/
public long getFirstLogSegmentSequenceNumber() {
return this.getLong(BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER,
getLong(BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_OLD,
BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_DEFAULT));
}
/**
* Set the value at which ledger sequence number should start for streams that are being
* upgraded and did not have ledger sequence number to start with or for newly created
* streams
*
* @param firstLogSegmentSequenceNumber first ledger sequence number
* @return distributed log configuration
* @see #getFirstLogSegmentSequenceNumber()
*/
public DistributedLogConfiguration setFirstLogSegmentSequenceNumber(long firstLogSegmentSequenceNumber)
throws IllegalArgumentException {
if (firstLogSegmentSequenceNumber <= 0) {
// Incorrect ledger sequence number specified
throw new IllegalArgumentException("Incorrect value for ledger sequence number");
}
setProperty(BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER, firstLogSegmentSequenceNumber);
return this;
}
/**
* Whether log segment sequence number validation is enabled?
*
* @return true if the log segment sequence number validation is enabled, otherwise false.
*/
public boolean isLogSegmentSequenceNumberValidationEnabled() {
return this.getBoolean(BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED,
BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED_DEFAULT);
}
/**
* Whether log segment sequence number validation is enabled?
*
* @return true if the log segment sequence number validation is enabled, otherwise false.
*/
public DistributedLogConfiguration setLogSegmentSequenceNumberValidationEnabled(boolean enabled) {
setProperty(BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED, enabled);
return this;
}
/**
* Whether we should publish record counts in the log records and metadata.
* <p>By default it is true. This is a legacy setting for log segment version 1. It
* should be considered removed.
*
* @return if record counts should be persisted
*/
public boolean getEnableRecordCounts() {
return getBoolean(BKDL_ENABLE_RECORD_COUNTS, BKDL_ENABLE_RECORD_COUNTS_DEFAULT);
}
/**
* Set if we should publish record counts in the log records and metadata.
*
* @param enableRecordCounts enable record counts
* @return distributed log configuration
* @see #getEnableRecordCounts()
*/
public DistributedLogConfiguration setEnableRecordCounts(boolean enableRecordCounts) {
setProperty(BKDL_ENABLE_RECORD_COUNTS, enableRecordCounts);
return this;
}
/**
* Whether sanity check txn id on starting log segments.
* <p>If it is enabled, DL writer would throw
* {@link com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException}
* when it received a smaller transaction id than current maximum transaction id.
*
* @return true if should check txn id with max txn id, otherwise false.
*/
@Deprecated
public boolean getSanityCheckTxnID() {
return getBoolean(BKDL_MAXID_SANITYCHECK, BKDL_MAXID_SANITYCHECK_DEFAULT);
}
/**
* Enable/Disable sanity check txn id.
*
* @param enabled
* enable/disable sanity check txn id.
* @return configuration.
* @see #getSanityCheckTxnID()
*/
@Deprecated
public DistributedLogConfiguration setSanityCheckTxnID(boolean enabled) {
setProperty(BKDL_MAXID_SANITYCHECK, enabled);
return this;
}
/**
* Whether encode region id in log segment metadata.
* <p>In global DL use case, encoding region id in log segment medata would
* help understanding what region that a log segment is created. The region
* id field in log segment metadata would help for moniotring and troubleshooting.
*
* @return whether to encode region id in log segment metadata.
*/
public boolean getEncodeRegionIDInLogSegmentMetadata() {
return getBoolean(BKDL_ENCODE_REGION_ID_IN_VERSION, BKDL_ENCODE_REGION_ID_IN_VERSION_DEFAULT);
}
/**
* Enable/Disable encoding region id in log segment metadata.
*
* @param enabled
* flag to enable/disable encoding region id in log segment metadata.
* @return configuration instance.
* @see #getEncodeRegionIDInLogSegmentMetadata()
*/
public DistributedLogConfiguration setEncodeRegionIDInLogSegmentMetadata(boolean enabled) {
setProperty(BKDL_ENCODE_REGION_ID_IN_VERSION, enabled);
return this;
}
/**
* Get log segment name version.
* <p>
* <ul>
* <li>version 0: inprogress_(start_txid) |
* logrecs_(start_txid)_(end_txid)</li>
* <li>version 1: inprogress_(logsegment_sequence_number) |
* logrecs_(logsegment_sequence_number)</li>
* </ul>
* By default it is 1.
*
* @return log segment name verison.
*/
public int getLogSegmentNameVersion() {
return getInt(BKDL_LOGSEGMENT_NAME_VERSION, BKDL_LOGSEGMENT_NAME_VERSION_DEFAULT);
}
/**
* Set log segment name version.
*
* @param version
* log segment name version.
* @return configuration object.
* @see #getLogSegmentNameVersion()
*/
public DistributedLogConfiguration setLogSegmentNameVersion(int version) {
setProperty(BKDL_LOGSEGMENT_NAME_VERSION, version);
return this;
}
/**
* Get name of the unpartitioned stream.
* <p>It is a legacy setting. consider removing it in future.
*
* @return unpartitioned stream
*/
public String getUnpartitionedStreamName() {
return getString(BKDL_UNPARTITIONED_STREAM_NAME, BKDL_UNPARTITIONED_STREAM_NAME_DEFAULT);
}
/**
* Set name of the unpartitioned stream
*
* @param streamName name of the unpartitioned stream
* @return distributedlog configuration
* @see #getUnpartitionedStreamName()
*/
public DistributedLogConfiguration setUnpartitionedStreamName(String streamName) {
setProperty(BKDL_UNPARTITIONED_STREAM_NAME, streamName);
return this;
}
//
// LogSegment Cache Settings
//
/**
* Get the log segment cache entry TTL in milliseconds.
*
* @return log segment cache ttl in milliseconds.
*/
public long getLogSegmentCacheTTLMs() {
return getLong(BKDL_LOGSEGMENT_CACHE_TTL_MS, BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT);
}
/**
* Set the log segment cache entry TTL in milliseconds.
*
* @param ttlMs TTL in milliseconds
* @return distributedlog configuration
*/
public DistributedLogConfiguration setLogSegmentCacheTTLMs(long ttlMs) {
setProperty(BKDL_LOGSEGMENT_CACHE_TTL_MS, ttlMs);
return this;
}
/**
* Get the maximum size of the log segment cache.
*
* @return maximum size of the log segment cache.
*/
public long getLogSegmentCacheMaxSize() {
return getLong(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT);
}
/**
* Set the maximum size of the log segment cache.
*
* @param maxSize maximum size of the log segment cache.
* @return distributedlog configuration
*/
public DistributedLogConfiguration setLogSegmentCacheMaxSize(long maxSize) {
setProperty(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, maxSize);
return this;
}
/**
* Is log segment cache enabled?
*
* @return true if log segment cache is enabled; otherwise false
*/
public boolean isLogSegmentCacheEnabled() {
return getBoolean(BKDL_LOGSEGMENT_CACHE_ENABLED, BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT);
}
/**
* Enable/disable log segment cache.
*
* @return distributedlog configuration
*/
public DistributedLogConfiguration setLogSegmentCacheEnabled(boolean enabled) {
setProperty(BKDL_LOGSEGMENT_CACHE_ENABLED, enabled);
return this;
}
//
// DL Writer General Settings
//
/**
* Whether to create stream if not exists. By default it is true.
*
* @return true if it is abled to create stream if not exists.
*/
public boolean getCreateStreamIfNotExists() {
return getBoolean(BKDL_CREATE_STREAM_IF_NOT_EXISTS,
BKDL_CREATE_STREAM_IF_NOT_EXISTS_DEFAULT);
}
/**
* Enable/Disable creating stream if not exists.
*
* @param enabled
* enable/disable sanity check txn id.
* @return distributed log configuration.
* @see #getCreateStreamIfNotExists()
*/
public DistributedLogConfiguration setCreateStreamIfNotExists(boolean enabled) {
setProperty(BKDL_CREATE_STREAM_IF_NOT_EXISTS, enabled);
return this;
}
/**
* Get Log Flush timeout in seconds.
* <p>This is a setting used by DL writer on flushing data. It is typically used
* by synchronous writer and log segment writer. By default it is 30 seconds.
*
* @return log flush timeout in seconds.
*/
// @Deprecated
public int getLogFlushTimeoutSeconds() {
return this.getInt(BKDL_LOG_FLUSH_TIMEOUT, BKDL_LOG_FLUSH_TIMEOUT_DEFAULT);
}
/**
* Set Log Flush Timeout in seconds.
*
* @param logFlushTimeoutSeconds log flush timeout.
* @return distributed log configuration
* @see #getLogFlushTimeoutSeconds()
*/
public DistributedLogConfiguration setLogFlushTimeoutSeconds(int logFlushTimeoutSeconds) {
setProperty(BKDL_LOG_FLUSH_TIMEOUT, logFlushTimeoutSeconds);
return this;
}
/**
* The compression type to use while sending data to bookkeeper.
*
* @return compression type to use
* @see com.twitter.distributedlog.io.CompressionCodec
*/
public String getCompressionType() {
return getString(BKDL_COMPRESSION_TYPE, BKDL_COMPRESSION_TYPE_DEFAULT);
}
/**
* Set the compression type to use while sending data to bookkeeper.
*
* @param compressionType compression type
* @return distributedlog configuration
* @see #getCompressionType()
*/
public DistributedLogConfiguration setCompressionType(String compressionType) {
Preconditions.checkArgument(null != compressionType && !compressionType.isEmpty());
setProperty(BKDL_COMPRESSION_TYPE, compressionType);
return this;
}
/**
* Whether to fail immediately if the stream is not ready rather than queueing the request.
* <p>If it is enabled, it would fail the write request immediately if the stream isn't ready.
* Consider turning it on for the use cases that could retry writing to other streams
* (aka non-strict ordering guarantee). It would result fast failure hence the client would
* retry immediately.
*
* @return true if should fail fast. otherwise, false.
*/
public boolean getFailFastOnStreamNotReady() {
return getBoolean(BKDL_FAILFAST_ON_STREAM_NOT_READY,
BKDL_FAILFAST_ON_STREAM_NOT_READY_DEFAULT);
}
/**
* Set the failfast on stream not ready flag.
*
* @param failFastOnStreamNotReady
* set failfast flag
* @return dl configuration.
* @see #getFailFastOnStreamNotReady()
*/
public DistributedLogConfiguration setFailFastOnStreamNotReady(boolean failFastOnStreamNotReady) {
setProperty(BKDL_FAILFAST_ON_STREAM_NOT_READY, failFastOnStreamNotReady);
return this;
}
/**
* If this option is set, the log writer won't reset the segment writer if an error
* is encountered.
*
* @return true if we should disable automatic rolling
*/
public boolean getDisableRollingOnLogSegmentError() {
return getBoolean(BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR,
BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR_DEFAULT);
}
/**
* Set the roll on segment error flag.
*
* @param disableRollingOnLogSegmentError
* set roll on error flag
* @return dl configuration.
* @see #getDisableRollingOnLogSegmentError()
*/
public DistributedLogConfiguration setDisableRollingOnLogSegmentError(boolean disableRollingOnLogSegmentError) {
setProperty(BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR, disableRollingOnLogSegmentError);
return this;
}
//
// DL Durability Settings
//
/**
* Check whether the durable write is enabled.
* <p>It is enabled by default.
*
* @return true if durable write is enabled. otherwise, false.
*/
public boolean isDurableWriteEnabled() {
return this.getBoolean(BKDL_IS_DURABLE_WRITE_ENABLED, BKDL_IS_DURABLE_WRITE_ENABLED_DEFAULT);
}
/**
* Enable/Disable durable writes in writers.
*
* @param enabled
* flag to enable/disable durable writes in writers.
* @return distributedlog configuration
*/
public DistributedLogConfiguration setDurableWriteEnabled(boolean enabled) {
setProperty(BKDL_IS_DURABLE_WRITE_ENABLED, enabled);
return this;
}
//
// DL Writer Transmit Settings
//
/**
* Get output buffer size for DL writers, in bytes.
* <p>Large buffer will result in higher compression ratio and
* it would use the bandwidth more efficiently and improve throughput.
* Set it to 0 would ask DL writers to transmit the data immediately,
* which it could achieve low latency.
* <p>The default value is 1KB.
*
* @return buffer size in byes.
*/
public int getOutputBufferSize() {
return this.getInt(BKDL_OUTPUT_BUFFER_SIZE,
getInt(BKDL_OUTPUT_BUFFER_SIZE_OLD, BKDL_OUTPUT_BUFFER_SIZE_DEFAULT));
}
/**
* Set output buffer size for DL writers, in bytes.
*
* @param opBufferSize output buffer size.
* @return distributed log configuration
* @see #getOutputBufferSize()
*/
public DistributedLogConfiguration setOutputBufferSize(int opBufferSize) {
setProperty(BKDL_OUTPUT_BUFFER_SIZE, opBufferSize);
return this;
}
/**
* Get Periodic Log Flush Frequency in milliseconds.
* <p>If the setting is set with a positive value, the data in output buffer
* will be flushed in this provided interval. The default value is 0.
*
* @return periodic flush frequency in milliseconds.
* @see #getOutputBufferSize()
*/
public int getPeriodicFlushFrequencyMilliSeconds() {
return this.getInt(BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS,
BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS_DEFAULT);
}
/**
* Set Periodic Log Flush Frequency in milliseconds.
*
* @param flushFrequencyMs periodic flush frequency in milliseconds.
* @return distributed log configuration
* @see #getPeriodicFlushFrequencyMilliSeconds()
*/
public DistributedLogConfiguration setPeriodicFlushFrequencyMilliSeconds(int flushFrequencyMs) {
setProperty(BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS, flushFrequencyMs);
return this;
}
/**
* Is immediate flush enabled.
* <p>If it is enabled, it would flush control record immediately after adding
* data completed. The default value is false.
*
* @return whether immediate flush is enabled
*/
public boolean getImmediateFlushEnabled() {
return getBoolean(BKDL_ENABLE_IMMEDIATE_FLUSH, BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT);
}
/**
* Enable/Disable immediate flush
*
* @param enabled
* flag to enable/disable immediate flush.
* @return configuration instance.
* @see #getImmediateFlushEnabled()
*/
public DistributedLogConfiguration setImmediateFlushEnabled(boolean enabled) {
setProperty(BKDL_ENABLE_IMMEDIATE_FLUSH, enabled);
return this;
}
/**
* Get minimum delay between immediate flushes in milliseconds.
* <p>This setting only takes effects when {@link #getImmediateFlushEnabled()}
* is enabled. It torelants the bursty of traffic when immediate flush is enabled,
* which prevents sending too many control records to the bookkeeper.
*
* @return minimum delay between immediate flushes in milliseconds
* @see #getImmediateFlushEnabled()
*/
public int getMinDelayBetweenImmediateFlushMs() {
return this.getInt(BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS, BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT);
}
/**
* Set minimum delay between immediate flushes in milliseconds
*
* @param minDelayMs minimum delay between immediate flushes in milliseconds.
* @return distributed log configuration
* @see #getMinDelayBetweenImmediateFlushMs()
*/
public DistributedLogConfiguration setMinDelayBetweenImmediateFlushMs(int minDelayMs) {
setProperty(BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS, minDelayMs);
return this;
}
/**
* Get Periodic Keep Alive Frequency in milliseconds.
* <p>If the setting is set with a positive value, it would periodically write a control record
* to keep the stream active. The default value is 0.
*
* @return periodic keep alive frequency in milliseconds.
*/
public int getPeriodicKeepAliveMilliSeconds() {
return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT);
}
/**
* Set Periodic Keep Alive Frequency in milliseconds.
*
* @param keepAliveMs keep alive frequency in milliseconds.
* @return distributedlog configuration
* @see #getPeriodicKeepAliveMilliSeconds()
*/
public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int keepAliveMs) {
setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs);
return this;
}
//
// DL Retention/Truncation Settings
//
/**
* Get log segment retention period in hours.
* The default value is 3 days.
*
* @return log segment retention period in hours
*/
public int getRetentionPeriodHours() {
return this.getInt(BKDL_RETENTION_PERIOD_IN_HOURS,
getInt(BKDL_RETENTION_PERIOD_IN_HOURS_OLD,
BKDL_RETENTION_PERIOD_IN_HOURS_DEFAULT));
}
/**
* Set log segment retention period in hours.
*
* @param retentionHours retention period in hours.
* @return distributed log configuration
*/
public DistributedLogConfiguration setRetentionPeriodHours(int retentionHours) {
setProperty(BKDL_RETENTION_PERIOD_IN_HOURS, retentionHours);
return this;
}
/**
* Is truncation managed explicitly by the application.
* <p>If this is set then time based retention is only a hint to perform
* deferred cleanup. However we never remove a segment that has not been
* already marked truncated.
* <p>It is disabled by default.
*
* @return whether truncation managed explicitly by the application
* @see com.twitter.distributedlog.LogSegmentMetadata.TruncationStatus
*/
public boolean getExplicitTruncationByApplication() {
return getBoolean(BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION,
BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION_DEFAULT);
}
/**
* Enable/Disable whether truncation is managed explicitly by the application.
*
* @param enabled
* flag to enable/disable whether truncation is managed explicitly by the application.
* @return configuration instance.
*/
public DistributedLogConfiguration setExplicitTruncationByApplication(boolean enabled) {
setProperty(BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION, enabled);
return this;
}
//
// Log Segment Rolling Settings
//
/**
* Get log segment rolling interval in minutes.
* <p>If the setting is set to a positive value, DL writer will roll log segments
* based on time. Otherwise, it will roll log segments based on size.
* <p>The default value is 2 hours.
*
* @return log segment rolling interval in minutes
* @see #getMaxLogSegmentBytes()
*/
public int getLogSegmentRollingIntervalMinutes() {
return this.getInt(BKDL_ROLLING_INTERVAL_IN_MINUTES,
getInt(BKDL_ROLLING_INTERVAL_IN_MINUTES_OLD,
BKDL_ROLLING_INTERVAL_IN_MINUTES_DEFAULT));
}
/**
* Set log segment rolling interval in minutes.
*
* @param rollingMinutes rolling interval in minutes.
* @return distributed log configuration
* @see #getLogSegmentRollingIntervalMinutes()
*/
public DistributedLogConfiguration setLogSegmentRollingIntervalMinutes(int rollingMinutes) {
setProperty(BKDL_ROLLING_INTERVAL_IN_MINUTES, rollingMinutes);
return this;
}
/**
* Get Max LogSegment Size in Bytes.
* <p>This setting only takes effects when time based rolling is disabled.
* DL writer will roll into a new log segment only after current one reaches
* this threshold.
* <p>The default value is 256MB.
*
* @return max logsegment size in bytes.
* @see #getLogSegmentRollingIntervalMinutes()
*/
public long getMaxLogSegmentBytes() {
long maxBytes = this.getLong(BKDL_MAX_LOGSEGMENT_BYTES, BKDL_MAX_LOGSEGMENT_BYTES_DEFAULT);
if (maxBytes <= 0) {
maxBytes = BKDL_MAX_LOGSEGMENT_BYTES_DEFAULT;
}
return maxBytes;
}
/**
* Set Max LogSegment Size in Bytes.
*
* @param maxBytes
* max logsegment size in bytes.
* @return configuration.
* @see #getMaxLogSegmentBytes()
*/
public DistributedLogConfiguration setMaxLogSegmentBytes(long maxBytes) {
setProperty(BKDL_MAX_LOGSEGMENT_BYTES, maxBytes);
return this;
}
/**
* Get log segment rolling concurrency.
* <p>It limits how many writers could roll log segments concurrently.
* The default value is 1.
*
* @return log segment rolling concurrency.
* @see #setLogSegmentRollingConcurrency(int)
*/
public int getLogSegmentRollingConcurrency() {
return getInt(BKDL_LOGSEGMENT_ROLLING_CONCURRENCY, BKDL_LOGSEGMENT_ROLLING_CONCURRENCY_DEFAULT);
}
/**
* Set log segment rolling concurrency. <i>0</i> means disable rolling concurrency.
* <i>larger than 0</i> means how many log segment could be rolled at the same time.
* <i>less than 0</i> means unlimited concurrency on rolling log segments.
*
* @param concurrency
* log segment rolling concurrency.
* @return distributed log configuration.
* @see #getLogSegmentRollingConcurrency()
*/
public DistributedLogConfiguration setLogSegmentRollingConcurrency(int concurrency) {
setProperty(BKDL_LOGSEGMENT_ROLLING_CONCURRENCY, concurrency);
return this;
}
//
// Lock Settings
//
/**
* Is lock enabled when opening a writer to write a stream?
* <p> We don't generally require a lock to write a stream to guarantee correctness. The lock
* is more on tracking ownerships. The built-in fencing mechanism is used guarantee correctness
* during stream owner failover. It is okay to disable lock if your application knows which nodes
* have to write which streams.
*
* @return true if lock is enabled, otherwise false.
*/
public boolean isWriteLockEnabled() {
return this.getBoolean(BKDL_WRITE_LOCK_ENABLED, BKDL_WRITE_LOCK_ENABLED_DEFAULT);
}
/**
* Enable lock for opening a writer to write a stream?
*
* @param enabled flag to enable or disable lock for opening a writer to write a stream.
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setWriteLockEnabled(boolean enabled) {
setProperty(BKDL_WRITE_LOCK_ENABLED, enabled);
return this;
}
/**
* Get lock timeout in milliseconds. The default value is 30.
*
* @return lock timeout in milliseconds
*/
public long getLockTimeoutMilliSeconds() {
return this.getLong(BKDL_LOCK_TIMEOUT, BKDL_LOCK_TIMEOUT_DEFAULT) * 1000;
}
/**
* Set lock timeout in seconds.
*
* @param lockTimeout lock timeout in seconds.
* @return distributed log configuration
* @see #getLockTimeoutMilliSeconds()
*/
public DistributedLogConfiguration setLockTimeout(long lockTimeout) {
setProperty(BKDL_LOCK_TIMEOUT, lockTimeout);
return this;
}
/**
* Get lock reacquire timeout in milliseconds. The default value is 120 seconds.
*
* @return lock reacquire timeout in milliseconds
*/
public long getLockReacquireTimeoutMilliSeconds() {
return this.getLong(BKDL_LOCK_REACQUIRE_TIMEOUT, BKDL_LOCK_REACQUIRE_TIMEOUT_DEFAULT) * 1000;
}
/**
* Set lock reacquire timeout in seconds.
*
* @param lockReacquireTimeout lock reacquire timeout in seconds.
* @return distributed log configuration
* @see #getLockReacquireTimeoutMilliSeconds()
*/
public DistributedLogConfiguration setLockReacquireTimeoutSeconds(long lockReacquireTimeout) {
setProperty(BKDL_LOCK_REACQUIRE_TIMEOUT, lockReacquireTimeout);
return this;
}
/**
* Get lock internal operation timeout in milliseconds.
* The default value is 120 seconds.
*
* @return lock internal operation timeout in milliseconds.
*/
public long getLockOpTimeoutMilliSeconds() {
return this.getLong(BKDL_LOCK_OP_TIMEOUT, BKDL_LOCK_OP_TIMEOUT_DEFAULT) * 1000;
}
/**
* Set lock internal operation timeout in seconds.
*
* @param lockOpTimeout lock internal operation timeout in seconds.
* @return distributed log configuration
* @see #getLockOpTimeoutMilliSeconds()
*/
public DistributedLogConfiguration setLockOpTimeoutSeconds(long lockOpTimeout) {
setProperty(BKDL_LOCK_OP_TIMEOUT, lockOpTimeout);
return this;
}
//
// Ledger Allocator Settings
//
/**
* Whether to enable ledger allocator pool or not.
* It is disabled by default.
*
* @return whether using ledger allocator pool or not.
*/
public boolean getEnableLedgerAllocatorPool() {
return getBoolean(BKDL_ENABLE_LEDGER_ALLOCATOR_POOL, BKDL_ENABLE_LEDGER_ALLOCATOR_POOL_DEFAULT);
}
/**
* Enable/Disable ledger allocator pool.
*
* @param enabled
* enable/disable ledger allocator pool.
* @return configuration.
* @see #getEnableLedgerAllocatorPool()
*/
public DistributedLogConfiguration setEnableLedgerAllocatorPool(boolean enabled) {
setProperty(BKDL_ENABLE_LEDGER_ALLOCATOR_POOL, enabled);
return this;
}
/**
* Get the path of ledger allocator pool.
* The default value is ".allocation_pool".
*
* @return path of ledger allocator pool.
*/
public String getLedgerAllocatorPoolPath() {
return getString(BKDL_LEDGER_ALLOCATOR_POOL_PATH, BKDL_LEDGER_ALLOCATOR_POOL_PATH_DEFAULT);
}
/**
* Set the root path of ledger allocator pool
*
* @param path
* path of ledger allocator pool.
* @return configuration
* @see #getLedgerAllocatorPoolPath()
*/
public DistributedLogConfiguration setLedgerAllocatorPoolPath(String path) {
setProperty(BKDL_LEDGER_ALLOCATOR_POOL_PATH, path);
return this;
}
/**
* Get the name of ledger allocator pool.
*
* @return name of ledger allocator pool.
*/
public String getLedgerAllocatorPoolName() {
return getString(BKDL_LEDGER_ALLOCATOR_POOL_NAME, BKDL_LEDGER_ALLOCATOR_POOL_NAME_DEFAULT);
}
/**
* Set name of ledger allocator pool.
*
* @param name
* name of ledger allocator pool.
* @return configuration.
*/
public DistributedLogConfiguration setLedgerAllocatorPoolName(String name) {
setProperty(BKDL_LEDGER_ALLOCATOR_POOL_NAME, name);
return this;
}
/**
* Get the core size of ledger allocator pool.
* The default value is 20.
*
* @return core size of ledger allocator pool.
*/
public int getLedgerAllocatorPoolCoreSize() {
return getInt(BKDL_LEDGER_ALLOCATOR_POOL_CORE_SIZE, BKDL_LEDGER_ALLOCATOR_POOL_CORE_SIZE_DEFAULT);
}
/**
* Set core size of ledger allocator pool.
*
* @param poolSize
* core size of ledger allocator pool.
* @return distributedlog configuration.
* @see #getLedgerAllocatorPoolCoreSize()
*/
public DistributedLogConfiguration setLedgerAllocatorPoolCoreSize(int poolSize) {
setProperty(BKDL_LEDGER_ALLOCATOR_POOL_CORE_SIZE, poolSize);
return this;
}
//
// Write Limit Settings
//
/**
* Get the per stream outstanding write limit for dl.
* <p>If the setting is set with a positive value, the per stream
* write limiting is enabled. By default it is disabled.
*
* @return the per stream outstanding write limit for dl
* @see #getGlobalOutstandingWriteLimit()
*/
public int getPerWriterOutstandingWriteLimit() {
return getInt(BKDL_PER_WRITER_OUTSTANDING_WRITE_LIMIT,
BKDL_PER_WRITER_OUTSTANDING_WRITE_LIMIT_DEFAULT);
}
/**
* Set the per stream outstanding write limit for dl.
*
* @param limit
* per stream outstanding write limit for dl
* @return dl configuration
* @see #getPerWriterOutstandingWriteLimit()
*/
public DistributedLogConfiguration setPerWriterOutstandingWriteLimit(int limit) {
setProperty(BKDL_PER_WRITER_OUTSTANDING_WRITE_LIMIT, limit);
return this;
}
/**
* Get the global write limit for dl.
* <p>If the setting is set with a positive value, the global
* write limiting is enabled. By default it is disabled.
*
* @return the global write limit for dl
* @see #getPerWriterOutstandingWriteLimit()
*/
public int getGlobalOutstandingWriteLimit() {
return getInt(BKDL_GLOBAL_OUTSTANDING_WRITE_LIMIT, BKDL_GLOBAL_OUTSTANDING_WRITE_LIMIT_DEFAULT);
}
/**
* Set the global write limit for dl.
*
* @param limit
* global write limit for dl
* @return dl configuration
* @see #getGlobalOutstandingWriteLimit()
*/
public DistributedLogConfiguration setGlobalOutstandingWriteLimit(int limit) {
setProperty(BKDL_GLOBAL_OUTSTANDING_WRITE_LIMIT, limit);
return this;
}
/**
* Whether to darkmode outstanding writes limit.
* <p>If it is running in darkmode, it would not reject requests when
* it is over limit, but just record them in the stats.
* <p>By default, it is in darkmode.
*
* @return flag to darmkode pending write limit.
*/
public boolean getOutstandingWriteLimitDarkmode() {
return getBoolean(BKDL_OUTSTANDING_WRITE_LIMIT_DARKMODE,
BKDL_OUTSTANDING_WRITE_LIMIT_DARKMODE_DEFAULT);
}
/**
* Set the flag to darkmode outstanding writes limit.
*
* @param darkmoded
* flag to darmkode pending write limit
* @return dl configuration.
* @see #getOutstandingWriteLimitDarkmode()
*/
public DistributedLogConfiguration setOutstandingWriteLimitDarkmode(boolean darkmoded) {
setProperty(BKDL_OUTSTANDING_WRITE_LIMIT_DARKMODE, darkmoded);
return this;
}
//
// DL Reader General Settings
//
/**
* Get the long poll time out for read last add confirmed requests, in milliseconds.
* The default value is 1 second.
*
* @return long poll timeout in milliseconds
* @see #getReadLACLongPollTimeout()
*/
public int getReadLACLongPollTimeout() {
return this.getInt(BKDL_READLACLONGPOLL_TIMEOUT, BKDL_READLACLONGPOLL_TIMEOUT_DEFAULT);
}
/**
* Set the long poll time out for read last add confirmed requests, in milliseconds.
*
* @param readAheadLongPollTimeout long poll timeout in milliseconds
* @return distributed log configuration
* @see #getReadLACLongPollTimeout()
*/
public DistributedLogConfiguration setReadLACLongPollTimeout(int readAheadLongPollTimeout) {
setProperty(BKDL_READLACLONGPOLL_TIMEOUT, readAheadLongPollTimeout);
return this;
}
/**
* Get the flag whether to deserialize record set on reads.
*
* @return true if it should deserialize, otherwise false.
*/
public boolean getDeserializeRecordSetOnReads() {
return getBoolean(BKDL_DESERIALIZE_RECORDSET_ON_READS, BKDL_DESERIALIZE_RECORDSET_ON_READS_DEFAULT);
}
/**
* Enable or disable deserialize recordset on reads.
*
* @param enabled
* flag whether to deserialize recordset
* @return distributedlog configuration
*/
public DistributedLogConfiguration setDeserializeRecordSetOnReads(boolean enabled) {
setProperty(BKDL_DESERIALIZE_RECORDSET_ON_READS, enabled);
return this;
}
//
// Idle reader settings
//
/**
* Get the time in milliseconds as the threshold for when an idle reader should dump warnings
* <p>The default value is 2 minutes.
*
* @return reader idle warn threshold in millis.
* @see #getReaderIdleErrorThresholdMillis()
*/
public int getReaderIdleWarnThresholdMillis() {
return getInt(BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS,
BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS_DEFAULT);
}
/**
* Set the time in milliseconds as the threshold for when an idle reader should dump warnings
*
* @param warnThreshold time after which we should dump the read ahead state
* @return distributed log configuration
* @see #getReaderIdleWarnThresholdMillis()
*/
public DistributedLogConfiguration setReaderIdleWarnThresholdMillis(int warnThreshold) {
setProperty(BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS, warnThreshold);
return this;
}
/**
* Get the time in milliseconds as the threshold for when an idle reader should throw errors
* <p>The default value is <i>Integer.MAX_VALUE</i>.
*
* @return reader idle error threshold in millis
* @see #getReaderIdleWarnThresholdMillis()
*/
public int getReaderIdleErrorThresholdMillis() {
return getInt(BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS,
BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS_DEFAULT);
}
/**
* Set the time in milliseconds as the threshold for when an idle reader should throw errors
*
* @param warnThreshold time after which we should throw idle reader errors
* @return distributed log configuration
* @see #getReaderIdleErrorThresholdMillis()
*/
public DistributedLogConfiguration setReaderIdleErrorThresholdMillis(int warnThreshold) {
setProperty(BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS, warnThreshold);
return this;
}
//
// Reader Constraint Settings
//
/**
* Get if we should ignore truncation status when reading the records
*
* @return if we should ignore truncation status
*/
public boolean getIgnoreTruncationStatus() {
return getBoolean(BKDL_READER_IGNORE_TRUNCATION_STATUS, BKDL_READER_IGNORE_TRUNCATION_STATUS_DEFAULT);
}
/**
* Set if we should ignore truncation status when reading the records
*
* @param ignoreTruncationStatus
* if we should ignore truncation status
*/
public DistributedLogConfiguration setIgnoreTruncationStatus(boolean ignoreTruncationStatus) {
setProperty(BKDL_READER_IGNORE_TRUNCATION_STATUS, ignoreTruncationStatus);
return this;
}
/**
* Get if we should alert when reader is positioned on a truncated segment
*
* @return if we should alert when reader is positioned on a truncated segment
*/
public boolean getAlertWhenPositioningOnTruncated() {
return getBoolean(BKDL_READER_ALERT_POSITION_ON_TRUNCATED, BKDL_READER_ALERT_POSITION_ON_TRUNCATED_DEFAULT);
}
/**
* Set if we should alert when reader is positioned on a truncated segment
*
* @param alertWhenPositioningOnTruncated
* if we should alert when reader is positioned on a truncated segment
* @return distributedlog configuration
*/
public DistributedLogConfiguration setAlertWhenPositioningOnTruncated(boolean alertWhenPositioningOnTruncated) {
setProperty(BKDL_READER_ALERT_POSITION_ON_TRUNCATED, alertWhenPositioningOnTruncated);
return this;
}
/**
* Get whether position gap detection for reader enabled.
* @return whether position gap detection for reader enabled.
*/
public boolean getPositionGapDetectionEnabled() {
return getBoolean(BKDL_READER_POSITION_GAP_DETECTION_ENABLED, BKDL_READER_POSITION_GAP_DETECTION_ENABLED_DEFAULT);
}
/**
* Set if enable position gap detection for reader.
*
* @param enabled
* flag to enable/disable position gap detection on reader.
* @return distributedlog configuration
*/
public DistributedLogConfiguration setPositionGapDetectionEnabled(boolean enabled) {
setProperty(BKDL_READER_POSITION_GAP_DETECTION_ENABLED, enabled);
return this;
}
//
// ReadAhead Settings
//
/**
* Set if we should enable read ahead.
* By default is it enabled.
*
* @param enableReadAhead
* Enable read ahead
* @return distributedlog configuration
*/
public DistributedLogConfiguration setEnableReadAhead(boolean enableReadAhead) {
setProperty(BKDL_ENABLE_READAHEAD, enableReadAhead);
return this;
}
/**
* Get if we should enable read ahead
*
* @return if read ahead is enabled
*/
public boolean getEnableReadAhead() {
return getBoolean(BKDL_ENABLE_READAHEAD, BKDL_ENABLE_READAHEAD_DEFAULT);
}
/**
* Set if we should enable force read
*
* @param enableForceRead
* Enable force read
*/
public DistributedLogConfiguration setEnableForceRead(boolean enableForceRead) {
setProperty(BKDL_ENABLE_FORCEREAD, enableForceRead);
return this;
}
/**
* Get if we should enable force read
*
* @return if should use separate ZK Clients
*/
public boolean getEnableForceRead() {
return getBoolean(BKDL_ENABLE_FORCEREAD, BKDL_ENABLE_FORCEREAD_DEFAULT);
}
/**
* Get the max records cached by readahead cache.
* <p>The default value is 10. Increase this value to improve throughput,
* but be careful about the memory.
*
* @return max records cached by readahead cache.
*/
public int getReadAheadMaxRecords() {
return this.getInt(BKDL_READAHEAD_MAX_RECORDS,
getInt(BKDL_READAHEAD_MAX_RECORDS_OLD,
BKDL_READAHEAD_MAX_RECORDS_DEFAULT));
}
/**
* Set the maximum records allowed to be cached by read ahead worker.
*
* @param readAheadMaxEntries max records to cache.
* @return distributed log configuration
* @see #getReadAheadMaxRecords()
*/
public DistributedLogConfiguration setReadAheadMaxRecords(int readAheadMaxEntries) {
setProperty(BKDL_READAHEAD_MAX_RECORDS, readAheadMaxEntries);
return this;
}
/**
* Get number of entries read as a batch by readahead worker.
* <p>The default value is 2. Increase the value to increase the concurrency
* of reading entries from bookkeeper.
*
* @return number of entries read as a batch.
*/
public int getReadAheadBatchSize() {
return this.getInt(BKDL_READAHEAD_BATCHSIZE,
getInt(BKDL_READAHEAD_BATCHSIZE_OLD,
BKDL_READAHEAD_BATCHSIZE_DEFAULT));
}
/**
* Set number of entries read as a batch by readahead worker.
*
* @param readAheadBatchSize
* Read ahead batch size.
* @return distributed log configuration
* @see #getReadAheadBatchSize()
*/
public DistributedLogConfiguration setReadAheadBatchSize(int readAheadBatchSize) {
setProperty(BKDL_READAHEAD_BATCHSIZE, readAheadBatchSize);
return this;
}
/**
* Get the wait time between successive attempts to poll for new log records, in milliseconds.
* The default value is 200 ms.
*
* @return read ahead wait time
*/
public int getReadAheadWaitTime() {
return this.getInt(BKDL_READAHEAD_WAITTIME,
getInt(BKDL_READAHEAD_WAITTIME_OLD, BKDL_READAHEAD_WAITTIME_DEFAULT));
}
/**
* Set the wait time between successive attempts to poll for new log records, in milliseconds
*
* @param readAheadWaitTime read ahead wait time
* @return distributed log configuration
* @see #getReadAheadWaitTime()
*/
public DistributedLogConfiguration setReadAheadWaitTime(int readAheadWaitTime) {
setProperty(BKDL_READAHEAD_WAITTIME, readAheadWaitTime);
return this;
}
/**
* Get the wait time if it reaches end of stream and
* <b>there isn't any inprogress logsegment in the stream</b>, in millis.
* <p>The default value is 10 seconds.
*
* @see #setReadAheadWaitTimeOnEndOfStream(int)
* @return the wait time if it reaches end of stream and there isn't
* any inprogress logsegment in the stream, in millis.
*/
public int getReadAheadWaitTimeOnEndOfStream() {
return this.getInt(BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM,
getInt(BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM_OLD,
BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM_DEFAULT));
}
/**
* Set the wait time that would be used for readahead to backoff polling
* logsegments from zookeeper when it reaches end of stream and there isn't
* any inprogress logsegment in the stream. The unit is millis.
*
* @param waitTime
* wait time that readahead used to backoff when reaching end of stream.
* @return distributedlog configuration
* @see #getReadAheadWaitTimeOnEndOfStream()
*/
public DistributedLogConfiguration setReadAheadWaitTimeOnEndOfStream(int waitTime) {
setProperty(BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM, waitTime);
return this;
}
/**
* If readahead keeps receiving {@link org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException} on
* reading last add confirmed in given period, it would stop polling last add confirmed and re-initialize the ledger
* handle and retry. The threshold is specified in milliseconds.
* <p>The default value is 10 seconds.
*
* @return error threshold in milliseconds, that readahead will reinitialize ledger handle after keeping receiving
* no such ledger exceptions.
*/
public int getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() {
return this.getInt(BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS,
BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS_DEFAULT);
}
/**
* Set the error threshold that readahead will reinitialize ledger handle after keeping receiving no such ledger exceptions.
*
* @see #getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis()
* @param thresholdMillis
* error threshold in milliseconds, that readahead will reinitialize ledger handle after keeping receiving
* no such ledger exceptions.
* @return distributedlog configuration
*/
public DistributedLogConfiguration setReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis(long thresholdMillis) {
setProperty(BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS, thresholdMillis);
return this;
}
/**
* When corruption is encountered in an entry, skip it and move on. Must disable gap detection for
* this to work.
*
* @return should broken records be skipped
*/
public boolean getReadAheadSkipBrokenEntries() {
return getBoolean(BKDL_READAHEAD_SKIP_BROKEN_ENTRIES, BKDL_READAHEAD_SKIP_BROKEN_ENTRIES_DEFAULT);
}
/**
* Set the percentage of operations to delay in read ahead.
*
* @param enabled
* should brokenn records be skipped
* @return distributedlog configuration
*/
public DistributedLogConfiguration setReadAheadSkipBrokenEntries(boolean enabled) {
setProperty(BKDL_READAHEAD_SKIP_BROKEN_ENTRIES, enabled);
return this;
}
/**
* Get the number prefetch entries per log segment. Default value is 4.
*
* @return the number prefetch entries per log segment.
*/
public int getNumPrefetchEntriesPerLogSegment() {
return getInt(BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT, BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT);
}
/**
* Set the number prefetch entries per log segment.
*
* @param numEntries the number prefetch entries per log segment.
* @return configuration
*/
public DistributedLogConfiguration setNumPrefetchEntriesPerLogSegment(int numEntries) {
setProperty(BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT, numEntries);
return this;
}
/**
* Get the max prefetch entries per log segment. Default value is 4.
*
* @return the max prefetch entries per log segment.
*/
public int getMaxPrefetchEntriesPerLogSegment() {
return getInt(BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT, BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT);
}
/**
* Set the max prefetch entries per log segment.
*
* @param numEntries the max prefetch entries per log segment.
* @return configuration
*/
public DistributedLogConfiguration setMaxPrefetchEntriesPerLogSegment(int numEntries) {
setProperty(BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT, numEntries);
return this;
}
//
// DL Reader Scan Settings
//
/**
* Number of entries to scan for first scan of reading last record.
*
* @return number of entries to scan for first scan of reading last record.
*/
public int getFirstNumEntriesPerReadLastRecordScan() {
return getInt(BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN, BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN_DEFAULT);
}
/**
* Set number of entries to scan for first scan of reading last record.
*
* @param numEntries
* number of entries to scan
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setFirstNumEntriesPerReadLastRecordScan(int numEntries) {
setProperty(BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN, numEntries);
return this;
}
/**
* Max number of entries for each scan to read last record.
*
* @return max number of entries for each scan to read last record.
*/
public int getMaxNumEntriesPerReadLastRecordScan() {
return getInt(BKDL_MAX_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN, BKDL_MAX_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN_DEFAULT);
}
/**
* Set max number of entries for each scan to read last record.
*
* @param numEntries
* number of entries to scan
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setMaxNumEntriesPerReadLastRecordScan(int numEntries) {
setProperty(BKDL_MAX_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN, numEntries);
return this;
}
//
// DL Reader Log Existence Checking Settings
//
/**
* Get the backoff start time to check log existence if the log doesn't exist.
*
* @return the backoff start time to check log existence if the log doesn't exist.
*/
public long getCheckLogExistenceBackoffStartMillis() {
return getLong(BKDL_CHECK_LOG_EXISTENCE_BACKOFF_START_MS, BKDL_CHECK_LOG_EXISTENCE_BACKOFF_START_MS_DEFAULT);
}
/**
* Set the backoff start time to check log existence if the log doesn't exist.
*
* @param backoffMillis
* backoff time in millis
* @return dl configuration
*/
public DistributedLogConfiguration setCheckLogExistenceBackoffStartMillis(long backoffMillis) {
setProperty(BKDL_CHECK_LOG_EXISTENCE_BACKOFF_START_MS, backoffMillis);
return this;
}
/**
* Get the backoff max time to check log existence if the log doesn't exist.
*
* @return the backoff max time to check log existence if the log doesn't exist.
*/
public long getCheckLogExistenceBackoffMaxMillis() {
return getLong(BKDL_CHECK_LOG_EXISTENCE_BACKOFF_MAX_MS, BKDL_CHECK_LOG_EXISTENCE_BACKOFF_MAX_MS_DEFAULT);
}
/**
* Set the backoff max time to check log existence if the log doesn't exist.
*
* @param backoffMillis
* backoff time in millis
* @return dl configuration
*/
public DistributedLogConfiguration setCheckLogExistenceBackoffMaxMillis(long backoffMillis) {
setProperty(BKDL_CHECK_LOG_EXISTENCE_BACKOFF_MAX_MS, backoffMillis);
return this;
}
//
// Tracing/Stats Settings
//
/**
* Whether to trace read ahead delivery latency or not?
*
* @return flag to trace read ahead delivery latency.
*/
public boolean getTraceReadAheadDeliveryLatency() {
return getBoolean(BKDL_TRACE_READAHEAD_DELIVERY_LATENCY, BKDL_TRACE_READAHEAD_DELIVERY_LATENCY_DEFAULT);
}
/**
* Set the flag to trace readahead delivery latency.
*
* @param enabled
* flag to trace readahead delivery latency.
* @return dl configuration.
*/
public DistributedLogConfiguration setTraceReadAheadDeliveryLatency(boolean enabled) {
setProperty(BKDL_TRACE_READAHEAD_DELIVERY_LATENCY, enabled);
return this;
}
/**
* Get the warn threshold (in millis) of metadata access latency.
*
* @return warn threshold of metadata access latency, in millis.
*/
public long getMetadataLatencyWarnThresholdMillis() {
return getLong(BKDL_METADATA_LATENCY_WARN_THRESHOLD_MS, BKDL_METADATA_LATENCY_WARN_THRESHOLD_MS_DEFAULT);
}
/**
* Set the warn threshold of metadata access latency, in millis.
*
* @param warnThresholdMillis
* warn threshold of metadata access latency, in millis
* @return dl configuration
*/
public DistributedLogConfiguration setMetadataLatencyWarnThresholdMillis(long warnThresholdMillis) {
setProperty(BKDL_METADATA_LATENCY_WARN_THRESHOLD_MS, warnThresholdMillis);
return this;
}
/**
* Get the warn threshold (in millis) of data access latency.
*
* @return warn threshold of data access latency, in millis.
*/
public long getDataLatencyWarnThresholdMillis() {
return getLong(BKDL_DATA_LATENCY_WARN_THRESHOLD_MS, BKDL_DATA_LATENCY_WARN_THRESHOLD_MS_DEFAULT);
}
/**
* Set the warn threshold of data access latency, in millis.
*
* @param warnThresholdMillis
* warn threshold of data access latency, in millis
* @return dl configuration
*/
public DistributedLogConfiguration setDataLatencyWarnThresholdMillis(long warnThresholdMillis) {
setProperty(BKDL_DATA_LATENCY_WARN_THRESHOLD_MS, warnThresholdMillis);
return this;
}
/**
* Whether to trace read ahead changes? If enabled, it will log readahead metadata changes with timestamp.
* It is helpful when you are troubleshooting latency related issues.
*
* @return flag to trace read ahead delivery latency.
*/
public boolean getTraceReadAheadMetadataChanges() {
return getBoolean(BKDL_TRACE_READAHEAD_METADATA_CHANGES, BKDL_TRACE_READAHEAD_MEATDATA_CHANGES_DEFAULT);
}
/**
* Set the flag to trace readahead metadata changes.
*
* @see #getTraceReadAheadMetadataChanges()
*
* @param enabled
* flag to trace readahead metadata changes.
* @return dl configuration.
*/
public DistributedLogConfiguration setTraceReadAheadMetadataChanges(boolean enabled) {
setProperty(BKDL_TRACE_READAHEAD_METADATA_CHANGES, enabled);
return this;
}
/**
* Whether to trace long running tasks and record task execution stats in thread pools.
*
* @return flag to enable task execution stats
*/
public boolean getEnableTaskExecutionStats() {
return getBoolean(BKDL_ENABLE_TASK_EXECUTION_STATS, BKDL_ENABLE_TASK_EXECUTION_STATS_DEFAULT);
}
/**
* Set to trace long running tasks and record task execution stats in thread pools.
*
* @see #getEnableTaskExecutionStats()
*
* @param enabled
* flag to enable task execution stats.
* @return dl configuration.
*/
public DistributedLogConfiguration setEnableTaskExecutionStats(boolean enabled) {
setProperty(BKDL_ENABLE_TASK_EXECUTION_STATS, enabled);
return this;
}
/**
* Report long running task after execution takes longer than the given interval.
*
* @return warn time for long running tasks
*/
public long getTaskExecutionWarnTimeMicros() {
return getLong(BKDL_TASK_EXECUTION_WARN_TIME_MICROS, BKDL_TASK_EXECUTION_WARN_TIME_MICROS_DEFAULT);
}
/**
* Set warn time for reporting long running tasks.
*
* @see #getTaskExecutionWarnTimeMicros()
*
* @param warnTimeMicros
* warn time for long running tasks.
* @return dl configuration.
*/
public DistributedLogConfiguration setTaskExecutionWarnTimeMicros(long warnTimeMicros) {
setProperty(BKDL_TASK_EXECUTION_WARN_TIME_MICROS, warnTimeMicros);
return this;
}
/**
* Whether to enable per stream stat or not.
*
* @deprecated please use {@link DistributedLogNamespaceBuilder#perLogStatsLogger(StatsLogger)}
* @return flag to enable per stream stat.
*/
public boolean getEnablePerStreamStat() {
return getBoolean(BKDL_ENABLE_PERSTREAM_STAT, BKDL_ENABLE_PERSTREAM_STAT_DEFAULT);
}
/**
* Set the flag to enable per stream stat or not.
*
* @deprecated please use {@link DistributedLogNamespaceBuilder#perLogStatsLogger(StatsLogger)}
* @param enabled
* flag to enable/disable per stream stat.
* @return dl configuration.
*/
public DistributedLogConfiguration setEnablePerStreamStat(boolean enabled) {
setProperty(BKDL_ENABLE_PERSTREAM_STAT, enabled);
return this;
}
//
// Settings for Feature Providers
//
/**
* Get feature provider class.
*
* @return feature provider class.
* @throws ConfigurationException
*/
public Class<? extends FeatureProvider> getFeatureProviderClass()
throws ConfigurationException {
return ReflectionUtils.getClass(this, BKDL_FEATURE_PROVIDER_CLASS, DefaultFeatureProvider.class,
FeatureProvider.class, FeatureProvider.class.getClassLoader());
}
/**
* Set feature provider class.
*
* @param providerClass
* feature provider class.
* @return distributedlog configuration
*/
public DistributedLogConfiguration setFeatureProviderClass(Class<? extends FeatureProvider> providerClass) {
setProperty(BKDL_FEATURE_PROVIDER_CLASS, providerClass.getName());
return this;
}
/**
* Get the base config path for file feature provider.
*
* @return base config path for file feature provider.
*/
public String getFileFeatureProviderBaseConfigPath() {
return getString(BKDL_FILE_FEATURE_PROVIDER_BASE_CONFIG_PATH,
BKDL_FILE_FEATURE_PROVIDER_BASE_CONFIG_PATH_DEFAULT);
}
/**
* Set the base config path for file feature provider.
*
* @param configPath
* base config path for file feature provider.
* @return distributedlog configuration
*/
public DistributedLogConfiguration setFileFeatureProviderBaseConfigPath(String configPath) {
setProperty(BKDL_FILE_FEATURE_PROVIDER_BASE_CONFIG_PATH, configPath);
return this;
}
/**
* Get the overlay config path for file feature provider.
*
* @return overlay config path for file feature provider.
*/
public String getFileFeatureProviderOverlayConfigPath() {
return getString(BKDL_FILE_FEATURE_PROVIDER_OVERLAY_CONFIG_PATH,
BKDL_FILE_FEATURE_PROVIDER_OVERLAY_CONFIG_PATH_DEFAULT);
}
/**
* Set the overlay config path for file feature provider.
*
* @param configPath
* overlay config path for file feature provider.
* @return distributedlog configuration
*/
public DistributedLogConfiguration setFileFeatureProviderOverlayConfigPath(String configPath) {
setProperty(BKDL_FILE_FEATURE_PROVIDER_OVERLAY_CONFIG_PATH,
configPath);
return this;
}
//
// Settings for Namespaces
//
/**
* Is federated namespace implementation enabled.
*
* @return true if federated namespace is enabled. otherwise, false.
*/
public boolean isFederatedNamespaceEnabled() {
return getBoolean(BKDL_FEDERATED_NAMESPACE_ENABLED, BKDL_FEDERATED_NAMESPACE_ENABLED_DEFAULT);
}
/**
* Use federated namespace implementation if this flag is enabled.
*
* @param enabled flag to enable federated namespace implementation
* @return distributedlog configuration
*/
public DistributedLogConfiguration setFederatedNamespaceEnabled(boolean enabled) {
setProperty(BKDL_FEDERATED_NAMESPACE_ENABLED, enabled);
return this;
}
/**
* Get the max logs per sub namespace for federated namespace.
*
* @return max logs per sub namespace
*/
public int getFederatedMaxLogsPerSubnamespace() {
return getInt(BKDL_FEDERATED_MAX_LOGS_PER_SUBNAMESPACE, BKDL_FEDERATED_MAX_LOGS_PER_SUBNAMESPACE_DEFAULT);
}
/**
* Set the max logs per sub namespace for federated namespace.
*
* @param maxLogs
* max logs per sub namespace
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setFederatedMaxLogsPerSubnamespace(int maxLogs) {
setProperty(BKDL_FEDERATED_MAX_LOGS_PER_SUBNAMESPACE, maxLogs);
return this;
}
/**
* Whether check the existence of a log if querying local cache of a federated namespace missed.
* Enabling it will issue zookeeper queries to check all sub namespaces under a federated namespace.
*
* NOTE: by default it is on for all admin related tools. for write proxies, consider turning off for
* performance.
*
* @return true if it needs to check existence of a log when querying local cache misses. otherwise false.
*/
public boolean getFederatedCheckExistenceWhenCacheMiss() {
return getBoolean(BKDL_FEDERATED_CHECK_EXISTENCE_WHEN_CACHE_MISS,
BKDL_FEDERATED_CHECK_EXISTENCE_WHEN_CACHE_MISS_DEFAULT);
}
/**
* Enable check existence of a log if quering local cache of a federated namespace missed.
*
* @param enabled
* flag to enable/disable this feature.
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setFederatedCheckExistenceWhenCacheMiss(boolean enabled) {
setProperty(BKDL_FEDERATED_CHECK_EXISTENCE_WHEN_CACHE_MISS, enabled);
return this;
}
//
// Settings for Configurations
//
/**
* Get dynamic configuration reload interval in seconds.
*
* @return dynamic configuration reload interval
*/
public int getDynamicConfigReloadIntervalSec() {
return getInt(BKDL_DYNAMIC_CONFIG_RELOAD_INTERVAL_SEC, BKDL_DYNAMIC_CONFIG_RELOAD_INTERVAL_SEC_DEFAULT);
}
/**
* Get dynamic configuration reload interval in seconds.
*
* @param intervalSec dynamic configuration reload interval in seconds
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setDynamicConfigReloadIntervalSec(int intervalSec) {
setProperty(BKDL_DYNAMIC_CONFIG_RELOAD_INTERVAL_SEC, intervalSec);
return this;
}
/**
* Get config router class which determines how stream name is mapped to configuration.
*
* @return config router class.
*/
public String getStreamConfigRouterClass() {
return getString(BKDL_STREAM_CONFIG_ROUTER_CLASS, BKDL_STREAM_CONFIG_ROUTER_CLASS_DEFAULT);
}
/**
* Set config router class.
*
* @param routerClass
* config router class.
* @return distributedlog configuration
*/
public DistributedLogConfiguration setStreamConfigRouterClass(String routerClass) {
setProperty(BKDL_STREAM_CONFIG_ROUTER_CLASS, routerClass);
return this;
}
//
// Settings for RateLimit
//
/**
* A lower threshold bytes per second limit on writes to the distributedlog proxy.
*
* @return Bytes per second write limit
*/
public int getBpsSoftWriteLimit() {
return getInt(BKDL_BPS_SOFT_WRITE_LIMIT, BKDL_BPS_SOFT_WRITE_LIMIT_DEFAULT);
}
/**
* An upper threshold bytes per second limit on writes to the distributedlog proxy.
*
* @return Bytes per second write limit
*/
public int getBpsHardWriteLimit() {
return getInt(BKDL_BPS_HARD_WRITE_LIMIT, BKDL_BPS_HARD_WRITE_LIMIT_DEFAULT);
}
/**
* A lower threshold requests per second limit on writes to the distributedlog proxy.
*
* @return Requests per second write limit
*/
public int getRpsSoftWriteLimit() {
return getInt(BKDL_RPS_SOFT_WRITE_LIMIT, BKDL_RPS_SOFT_WRITE_LIMIT_DEFAULT);
}
/**
* An upper threshold requests per second limit on writes to the distributedlog proxy.
*
* @return Requests per second write limit
*/
public int getRpsHardWriteLimit() {
return getInt(BKDL_RPS_HARD_WRITE_LIMIT, BKDL_RPS_HARD_WRITE_LIMIT_DEFAULT);
}
//
// Settings for partitioning
//
/**
* Get the maximum number of partitions of each stream allowed to be acquired per proxy.
* <p>This setting is able to configure per stream. This is the default setting if it is
* not configured per stream. Default value is -1, which means no limit on the number of
* partitions could be acquired each stream.
*
* @return maximum number of partitions of each stream allowed to be acquired per proxy.
*/
public int getMaxAcquiredPartitionsPerProxy() {
return getInt(BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY, BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY_DEFAULT);
}
/**
* Set the maximum number of partitions of each stream allowed to be acquired per proxy.
*
* @param numPartitions
* number of partitions of each stream allowed to be acquired
* @return distributedlog configuration
* @see #getMaxAcquiredPartitionsPerProxy()
*/
public DistributedLogConfiguration setMaxAcquiredPartitionsPerProxy(int numPartitions) {
setProperty(BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY, numPartitions);
return this;
}
/**
* Get the maximum number of partitions of each stream allowed to cache per proxy.
* <p>This setting is able to configure per stream. This is the default setting if it is
* not configured per stream. Default value is -1, which means no limit on the number of
* partitions could be acquired each stream.
*
* @return maximum number of partitions of each stream allowed to be acquired per proxy.
*/
public int getMaxCachedPartitionsPerProxy() {
return getInt(BKDL_MAX_CACHED_PARTITIONS_PER_PROXY, BKDL_MAX_CACHED_PARTITIONS_PER_PROXY_DEFAULT);
}
/**
* Set the maximum number of partitions of each stream allowed to cache per proxy.
*
* @param numPartitions
* number of partitions of each stream allowed to cache
* @return distributedlog configuration
* @see #getMaxAcquiredPartitionsPerProxy()
*/
public DistributedLogConfiguration setMaxCachedPartitionsPerProxy(int numPartitions) {
setProperty(BKDL_MAX_CACHED_PARTITIONS_PER_PROXY, numPartitions);
return this;
}
// Error Injection Settings
/**
* Should we enable write delay injection? If false we won't check other write delay settings.
*
* @return true if write delay injection is enabled.
*/
public boolean getEIInjectWriteDelay() {
return getBoolean(BKDL_EI_INJECT_WRITE_DELAY, BKDL_EI_INJECT_WRITE_DELAY_DEFAULT);
}
/**
* Get percent of write requests which should be delayed by BKDL_EI_INJECTED_WRITE_DELAY_MS.
*
* @return percent of writes to delay.
*/
public double getEIInjectedWriteDelayPercent() {
return getDouble(BKDL_EI_INJECTED_WRITE_DELAY_PERCENT, BKDL_EI_INJECTED_WRITE_DELAY_PERCENT_DEFAULT);
}
/**
* Set percent of write requests which should be delayed by BKDL_EI_INJECTED_WRITE_DELAY_MS. 0 disables
* write delay.
*
* @param percent
* percent of writes to delay.
* @return dl configuration.
*/
public DistributedLogConfiguration setEIInjectedWriteDelayPercent(double percent) {
setProperty(BKDL_EI_INJECTED_WRITE_DELAY_PERCENT, percent);
return this;
}
/**
* Get amount of time to delay writes for in writer failure injection.
*
* @return millis to delay writes for.
*/
public int getEIInjectedWriteDelayMs() {
return getInt(BKDL_EI_INJECTED_WRITE_DELAY_MS, BKDL_EI_INJECTED_WRITE_DELAY_MS_DEFAULT);
}
/**
* Set amount of time to delay writes for in writer failure injection. 0 disables write delay.
*
* @param delayMs
* ms to delay writes for.
* @return dl configuration.
*/
public DistributedLogConfiguration setEIInjectedWriteDelayMs(int delayMs) {
setProperty(BKDL_EI_INJECTED_WRITE_DELAY_MS, delayMs);
return this;
}
/**
* Get the flag whether to inject stalls in read ahead.
*
* @return true if to inject stalls in read ahead, otherwise false.
*/
public boolean getEIInjectReadAheadStall() {
return getBoolean(BKDL_EI_INJECT_READAHEAD_STALL, BKDL_EI_INJECT_READAHEAD_STALL_DEFAULT);
}
/**
* Set the flag whether to inject stalls in read ahead.
*
* @param enabled
* flag to inject stalls in read ahead.
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setEIInjectReadAheadStall(boolean enabled) {
setProperty(BKDL_EI_INJECT_READAHEAD_STALL, enabled);
return this;
}
/**
* Get the flag whether to inject broken entries in readahead.
*
* @return true if to inject corruption in read ahead, otherwise false.
*/
public boolean getEIInjectReadAheadBrokenEntries() {
return getBoolean(BKDL_EI_INJECT_READAHEAD_BROKEN_ENTRIES, BKDL_EI_INJECT_READAHEAD_BROKEN_ENTRIES_DEFAULT);
}
/**
* Set the flag whether to inject broken entries in read ahead.
*
* @param enabled
* flag to inject corruption in read ahead.
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setEIInjectReadAheadBrokenEntries(boolean enabled) {
setProperty(BKDL_EI_INJECT_READAHEAD_BROKEN_ENTRIES, enabled);
return this;
}
/**
* Get the flag whether to inject delay in read ahead.
*
* @return true if to inject delays in read ahead, otherwise false.
*/
public boolean getEIInjectReadAheadDelay() {
return getBoolean(BKDL_EI_INJECT_READAHEAD_DELAY, BKDL_EI_INJECT_READAHEAD_DELAY_DEFAULT);
}
/**
* Set the flag whether to inject delays in read ahead.
*
* @param enabled
* flag to inject delays in read ahead.
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setEIInjectReadAheadDelay(boolean enabled) {
setProperty(BKDL_EI_INJECT_READAHEAD_DELAY, enabled);
return this;
}
/**
* Get the max injected delay in read ahead, in millis.
*
* @return max injected delay in read ahead, in millis.
*/
public int getEIInjectMaxReadAheadDelayMs() {
return getInt(BKDL_EI_INJECT_MAX_READAHEAD_DELAY_MS, BKDL_EI_INJECT_MAX_READAHEAD_DELAY_MS_DEFAULT);
}
/**
* Set the max injected delay in read ahead, in millis.
*
* @param delayMs
* max injected delay in read ahead, in millis.
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setEIInjectMaxReadAheadDelayMs(int delayMs) {
setProperty(BKDL_EI_INJECT_MAX_READAHEAD_DELAY_MS, delayMs);
return this;
}
/**
* Get the percentage of operations to delay in read ahead.
*
* @return the percentage of operations to delay in read ahead.
*/
public int getEIInjectReadAheadDelayPercent() {
return getInt(BKDL_EI_INJECT_READAHEAD_DELAY_PERCENT, BKDL_EI_INJECT_READAHEAD_DELAY_PERCENT_DEFAULT);
}
/**
* Set the percentage of operations to delay in read ahead.
*
* @param percent
* the percentage of operations to delay in read ahead.
* @return distributedlog configuration
*/
public DistributedLogConfiguration setEIInjectReadAheadDelayPercent(int percent) {
setProperty(BKDL_EI_INJECT_READAHEAD_DELAY_PERCENT, percent);
return this;
}
/**
* Validate the configuration
*/
public void validate() {
Preconditions.checkArgument(getBKClientReadTimeout() * 1000 >= getReadLACLongPollTimeout(),
"Invalid timeout configuration: bkcReadTimeoutSeconds ("+getBKClientReadTimeout()+
") should be longer than readLACLongPollTimeout ("+getReadLACLongPollTimeout()+")");
long readerIdleWarnThresholdMs = getReaderIdleWarnThresholdMillis();
if (readerIdleWarnThresholdMs > 0) { // NOTE: some test cases set the idle warn threshold to 0
Preconditions.checkArgument(readerIdleWarnThresholdMs > 2 * getReadLACLongPollTimeout(),
"Invalid configuration: ReaderIdleWarnThreshold should be 2x larget than readLACLongPollTimeout");
}
}
}