blob: 2f4e098d677e5288cba237538d667207a1a94b28 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Charsets;
import java.time.Clock;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
/**
* Configuration class for a ManagedLedger.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
public class ManagedLedgerConfig {
private boolean createIfMissing = true;
private int maxUnackedRangesToPersist = 10000;
private int maxBatchDeletedIndexToPersist = 10000;
private boolean deletionAtBatchIndexLevelEnabled = true;
private int maxUnackedRangesToPersistInZk = 1000;
private int maxEntriesPerLedger = 50000;
private int maxSizePerLedgerMb = 100;
private int minimumRolloverTimeMs = 0;
private long maximumRolloverTimeMs = TimeUnit.HOURS.toMillis(4);
private int ensembleSize = 3;
private int writeQuorumSize = 2;
private int ackQuorumSize = 2;
private int metadataEnsembleSize = 3;
private int metadataWriteQuorumSize = 2;
private int metadataAckQuorumSize = 2;
private int metadataMaxEntriesPerLedger = 50000;
private int ledgerRolloverTimeout = 4 * 3600;
private double throttleMarkDelete = 0;
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
private boolean lazyCursorRecovery = false;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;
private long addEntryTimeoutSeconds = 120;
private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
private boolean unackedRangesOpenCacheSetEnabled = true;
private Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName;
private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
private ManagedLedgerInterceptor managedLedgerInterceptor;
public boolean isCreateIfMissing() {
return createIfMissing;
}
public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) {
this.createIfMissing = createIfMissing;
return this;
}
/**
* @return the lazyCursorRecovery
*/
public boolean isLazyCursorRecovery() {
return lazyCursorRecovery;
}
/**
* Whether to recover cursors lazily when trying to recover a
* managed ledger backing a persistent topic. It can improve write availability of topics.
* The caveat is now when recovered ledger is ready to write we're not sure if all old consumers last mark
* delete position can be recovered or not.
* @param lazyCursorRecovery if enable lazy cursor recovery.
*/
public ManagedLedgerConfig setLazyCursorRecovery(boolean lazyCursorRecovery) {
this.lazyCursorRecovery = lazyCursorRecovery;
return this;
}
/**
* @return the maxEntriesPerLedger
*/
public int getMaxEntriesPerLedger() {
return maxEntriesPerLedger;
}
/**
* @param maxEntriesPerLedger
* the maxEntriesPerLedger to set
*/
public ManagedLedgerConfig setMaxEntriesPerLedger(int maxEntriesPerLedger) {
this.maxEntriesPerLedger = maxEntriesPerLedger;
return this;
}
/**
* @return the maxSizePerLedgerMb
*/
public int getMaxSizePerLedgerMb() {
return maxSizePerLedgerMb;
}
/**
* @param maxSizePerLedgerMb
* the maxSizePerLedgerMb to set
*/
public ManagedLedgerConfig setMaxSizePerLedgerMb(int maxSizePerLedgerMb) {
this.maxSizePerLedgerMb = maxSizePerLedgerMb;
return this;
}
/**
* @return the minimum rollover time
*/
public int getMinimumRolloverTimeMs() {
return minimumRolloverTimeMs;
}
/**
* Set the minimum rollover time for ledgers in this managed ledger.
*
* <p/>If this time is > 0, a ledger will not be rolled over more frequently than the specified time, even if it has
* reached the maximum number of entries or maximum size. This parameter can be used to reduce the amount of
* rollovers on managed ledger with high write throughput.
*
* @param minimumRolloverTime
* the minimum rollover time
* @param unit
* the time unit
*/
public void setMinimumRolloverTime(int minimumRolloverTime, TimeUnit unit) {
this.minimumRolloverTimeMs = (int) unit.toMillis(minimumRolloverTime);
checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs,
"Minimum rollover time needs to be less than maximum rollover time");
}
/**
* @return the maximum rollover time.
*/
public long getMaximumRolloverTimeMs() {
return maximumRolloverTimeMs;
}
/**
* Set the maximum rollover time for ledgers in this managed ledger.
*
* <p/>If the ledger is not rolled over until this time, even if it has not reached the number of entry or size
* limit, this setting will trigger rollover. This parameter can be used for topics with low request rate to force
* rollover, so recovery failure does not have to go far back.
*
* @param maximumRolloverTime
* the maximum rollover time
* @param unit
* the time unit
*/
public void setMaximumRolloverTime(int maximumRolloverTime, TimeUnit unit) {
this.maximumRolloverTimeMs = unit.toMillis(maximumRolloverTime);
checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs,
"Maximum rollover time needs to be greater than minimum rollover time");
}
/**
* @return the ensembleSize
*/
public int getEnsembleSize() {
return ensembleSize;
}
/**
* @param ensembleSize
* the ensembleSize to set
*/
public ManagedLedgerConfig setEnsembleSize(int ensembleSize) {
this.ensembleSize = ensembleSize;
return this;
}
/**
* @return the ackQuorumSize
*/
public int getAckQuorumSize() {
return ackQuorumSize;
}
/**
* @return the writeQuorumSize
*/
public int getWriteQuorumSize() {
return writeQuorumSize;
}
/**
* @param writeQuorumSize
* the writeQuorumSize to set
*/
public ManagedLedgerConfig setWriteQuorumSize(int writeQuorumSize) {
this.writeQuorumSize = writeQuorumSize;
return this;
}
/**
* @param ackQuorumSize
* the ackQuorumSize to set
*/
public ManagedLedgerConfig setAckQuorumSize(int ackQuorumSize) {
this.ackQuorumSize = ackQuorumSize;
return this;
}
/**
* @return the digestType
*/
public DigestType getDigestType() {
return digestType;
}
/**
* @param digestType
* the digestType to set
*/
public ManagedLedgerConfig setDigestType(DigestType digestType) {
this.digestType = digestType;
return this;
}
/**
* @return the password
*/
public byte[] getPassword() {
return Arrays.copyOf(password, password.length);
}
/**
* @param password
* the password to set
*/
public ManagedLedgerConfig setPassword(String password) {
this.password = password.getBytes(Charsets.UTF_8);
return this;
}
/**
* should use {@link ConcurrentOpenLongPairRangeSet} to store unacked ranges.
* @return
*/
public boolean isUnackedRangesOpenCacheSetEnabled() {
return unackedRangesOpenCacheSetEnabled;
}
public ManagedLedgerConfig setUnackedRangesOpenCacheSetEnabled(boolean unackedRangesOpenCacheSetEnabled) {
this.unackedRangesOpenCacheSetEnabled = unackedRangesOpenCacheSetEnabled;
return this;
}
/**
* @return the metadataEnsemblesize
*/
public int getMetadataEnsemblesize() {
return metadataEnsembleSize;
}
/**
* @param metadataEnsembleSize
* the metadataEnsembleSize to set
*/
public ManagedLedgerConfig setMetadataEnsembleSize(int metadataEnsembleSize) {
this.metadataEnsembleSize = metadataEnsembleSize;
return this;
}
/**
* @return the metadataAckQuorumSize
*/
public int getMetadataAckQuorumSize() {
return metadataAckQuorumSize;
}
/**
* @return the metadataWriteQuorumSize
*/
public int getMetadataWriteQuorumSize() {
return metadataWriteQuorumSize;
}
/**
* @param metadataAckQuorumSize
* the metadataAckQuorumSize to set
*/
public ManagedLedgerConfig setMetadataAckQuorumSize(int metadataAckQuorumSize) {
this.metadataAckQuorumSize = metadataAckQuorumSize;
return this;
}
/**
* @param metadataWriteQuorumSize
* the metadataWriteQuorumSize to set
*/
public ManagedLedgerConfig setMetadataWriteQuorumSize(int metadataWriteQuorumSize) {
this.metadataWriteQuorumSize = metadataWriteQuorumSize;
return this;
}
/**
* @return the metadataMaxEntriesPerLedger
*/
public int getMetadataMaxEntriesPerLedger() {
return metadataMaxEntriesPerLedger;
}
/**
* @param metadataMaxEntriesPerLedger
* the metadataMaxEntriesPerLedger to set
*/
public ManagedLedgerConfig setMetadataMaxEntriesPerLedger(int metadataMaxEntriesPerLedger) {
this.metadataMaxEntriesPerLedger = metadataMaxEntriesPerLedger;
return this;
}
/**
* @return the ledgerRolloverTimeout
*/
public int getLedgerRolloverTimeout() {
return ledgerRolloverTimeout;
}
/**
* @param ledgerRolloverTimeout
* the ledgerRolloverTimeout to set
*/
public ManagedLedgerConfig setLedgerRolloverTimeout(int ledgerRolloverTimeout) {
this.ledgerRolloverTimeout = ledgerRolloverTimeout;
return this;
}
/**
* @return the throttling rate limit for mark-delete calls
*/
public double getThrottleMarkDelete() {
return throttleMarkDelete;
}
/**
* Set the rate limiter on how many mark-delete calls per second are allowed. If the value is set to 0, the rate
* limiter is disabled. Default is 0.
*
* @param throttleMarkDelete
* the max number of mark-delete calls allowed per second
*/
public ManagedLedgerConfig setThrottleMarkDelete(double throttleMarkDelete) {
checkArgument(throttleMarkDelete >= 0.0);
this.throttleMarkDelete = throttleMarkDelete;
return this;
}
/**
* Set the retention time for the ManagedLedger.
* <p>
* Retention time and retention size ({@link #setRetentionSizeInMB(long)}) are together used to retain the
* ledger data when when there are no cursors or when all the cursors have marked the data for deletion.
* Data will be deleted in this case when both retention time and retention size settings don't prevent deleting
* the data marked for deletion.
* <p>
* A retention time of 0 (default) will make data to be deleted immediately.
* <p>
* A retention time of -1 , means to have an unlimited retention time.
*
* @param retentionTime
* duration for which messages should be retained
* @param unit
* time unit for retention time
*/
public ManagedLedgerConfig setRetentionTime(int retentionTime, TimeUnit unit) {
this.retentionTimeMs = unit.toMillis(retentionTime);
return this;
}
/**
* @return duration for which messages are retained
*
*/
public long getRetentionTimeMillis() {
return retentionTimeMs;
}
/**
* The retention size is used to set a maximum retention size quota on the ManagedLedger.
* <p>
* Retention size and retention time ({@link #setRetentionTime(int, TimeUnit)}) are together used to retain the
* ledger data when when there are no cursors or when all the cursors have marked the data for deletion.
* Data will be deleted in this case when both retention time and retention size settings don't prevent deleting
* the data marked for deletion.
* <p>
* A retention size of 0 (default) will make data to be deleted immediately.
* <p>
* A retention size of -1 , means to have an unlimited retention size.
*
* @param retentionSizeInMB
* quota for message retention
*/
public ManagedLedgerConfig setRetentionSizeInMB(long retentionSizeInMB) {
this.retentionSizeInMB = retentionSizeInMB;
return this;
}
/**
* @return quota for message retention
*
*/
public long getRetentionSizeInMB() {
return retentionSizeInMB;
}
/**
* Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
* corrupted at bookkeeper and managed-cursor is stuck at that ledger.
*/
public boolean isAutoSkipNonRecoverableData() {
return autoSkipNonRecoverableData;
}
public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableData) {
this.autoSkipNonRecoverableData = skipNonRecoverableData;
}
/**
* @return max unacked message ranges that will be persisted and recovered.
*
*/
public int getMaxUnackedRangesToPersist() {
return maxUnackedRangesToPersist;
}
/**
* @return max batch deleted index that will be persisted and recoverd.
*/
public int getMaxBatchDeletedIndexToPersist() {
return maxBatchDeletedIndexToPersist;
}
/**
* @param maxUnackedRangesToPersist
* max unacked message ranges that will be persisted and receverd.
*/
public ManagedLedgerConfig setMaxUnackedRangesToPersist(int maxUnackedRangesToPersist) {
this.maxUnackedRangesToPersist = maxUnackedRangesToPersist;
return this;
}
/**
* @return max unacked message ranges up to which it can store in Zookeeper
*
*/
public int getMaxUnackedRangesToPersistInZk() {
return maxUnackedRangesToPersistInZk;
}
public void setMaxUnackedRangesToPersistInZk(int maxUnackedRangesToPersistInZk) {
this.maxUnackedRangesToPersistInZk = maxUnackedRangesToPersistInZk;
}
/**
* Get ledger offloader which will be used to offload ledgers to longterm storage.
*
* The default offloader throws an exception on any attempt to offload.
*
* @return a ledger offloader
*/
public LedgerOffloader getLedgerOffloader() {
return ledgerOffloader;
}
/**
* Set ledger offloader to use for offloading ledgers to longterm storage.
*
* @param offloader the ledger offloader to use
*/
public ManagedLedgerConfig setLedgerOffloader(LedgerOffloader offloader) {
this.ledgerOffloader = offloader;
return this;
}
/**
* Get clock to use to time operations
*
* @return a clock
*/
public Clock getClock() {
return clock;
}
/**
* Set clock to use for time operations
*
* @param clock the clock to use
*/
public ManagedLedgerConfig setClock(Clock clock) {
this.clock = clock;
return this;
}
/**
*
* Ledger-Op (Create/Delete) timeout
*
* @return
*/
public long getMetadataOperationsTimeoutSeconds() {
return metadataOperationsTimeoutSeconds;
}
/**
* Ledger-Op (Create/Delete) timeout after which callback will be completed with failure
*
* @param metadataOperationsTimeoutSeconds
*/
public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long metadataOperationsTimeoutSeconds) {
this.metadataOperationsTimeoutSeconds = metadataOperationsTimeoutSeconds;
return this;
}
/**
* Ledger read-entry timeout
*
* @return
*/
public long getReadEntryTimeoutSeconds() {
return readEntryTimeoutSeconds;
}
/**
* Ledger read entry timeout after which callback will be completed with failure. (disable timeout by setting
* readTimeoutSeconds <= 0)
*
* @param readEntryTimeoutSeconds
* @return
*/
public ManagedLedgerConfig setReadEntryTimeoutSeconds(long readEntryTimeoutSeconds) {
this.readEntryTimeoutSeconds = readEntryTimeoutSeconds;
return this;
}
public long getAddEntryTimeoutSeconds() {
return addEntryTimeoutSeconds;
}
/**
* Add-entry timeout after which add-entry callback will be failed if add-entry is not succeeded.
*
* @param addEntryTimeoutSeconds
*/
public ManagedLedgerConfig setAddEntryTimeoutSeconds(long addEntryTimeoutSeconds) {
this.addEntryTimeoutSeconds = addEntryTimeoutSeconds;
return this;
}
/**
* Managed-ledger can setup different custom EnsemblePlacementPolicy (eg: affinity to write ledgers to only setup of
* group of bookies).
*
* @return
*/
public Class<? extends EnsemblePlacementPolicy> getBookKeeperEnsemblePlacementPolicyClassName() {
return bookKeeperEnsemblePlacementPolicyClassName;
}
/**
* Returns EnsemblePlacementPolicy configured for the Managed-ledger.
*
* @param bookKeeperEnsemblePlacementPolicyClassName
*/
public void setBookKeeperEnsemblePlacementPolicyClassName(
Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName) {
this.bookKeeperEnsemblePlacementPolicyClassName = bookKeeperEnsemblePlacementPolicyClassName;
}
/**
* Returns properties required by configured bookKeeperEnsemblePlacementPolicy.
*
* @return
*/
public Map<String, Object> getBookKeeperEnsemblePlacementPolicyProperties() {
return bookKeeperEnsemblePlacementPolicyProperties;
}
/**
* Managed-ledger can setup different custom EnsemblePlacementPolicy which needs
* bookKeeperEnsemblePlacementPolicy-Properties.
*
* @param bookKeeperEnsemblePlacementPolicyProperties
*/
public void setBookKeeperEnsemblePlacementPolicyProperties(
Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties) {
this.bookKeeperEnsemblePlacementPolicyProperties = bookKeeperEnsemblePlacementPolicyProperties;
}
public boolean isDeletionAtBatchIndexLevelEnabled() {
return deletionAtBatchIndexLevelEnabled;
}
public void setDeletionAtBatchIndexLevelEnabled(boolean deletionAtBatchIndexLevelEnabled) {
this.deletionAtBatchIndexLevelEnabled = deletionAtBatchIndexLevelEnabled;
}
public int getNewEntriesCheckDelayInMillis() {
return newEntriesCheckDelayInMillis;
}
public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) {
this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
}
public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
return managedLedgerInterceptor;
}
public void setManagedLedgerInterceptor(ManagedLedgerInterceptor managedLedgerInterceptor) {
this.managedLedgerInterceptor = managedLedgerInterceptor;
}
}