| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server.coordinator; |
| |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableSet; |
| import org.apache.druid.common.config.JacksonConfigManager; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.logger.Logger; |
| import org.apache.druid.server.coordinator.duty.KillUnusedSegments; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| import javax.validation.constraints.Max; |
| import javax.validation.constraints.Min; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * This class is for users to change their configurations while their Druid cluster is running. |
| * These configurations are designed to allow only simple values rather than complicated JSON objects. |
| * |
| * @see JacksonConfigManager |
| * @see org.apache.druid.common.config.ConfigManager |
| */ |
| public class CoordinatorDynamicConfig |
| { |
| public static final String CONFIG_KEY = "coordinator.config"; |
| |
| private final long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; |
| private final long mergeBytesLimit; |
| private final int mergeSegmentsLimit; |
| private final int maxSegmentsToMove; |
| private final double percentOfSegmentsToConsiderPerMove; |
| private final boolean useBatchedSegmentSampler; |
| private final int replicantLifetime; |
| private final int replicationThrottleLimit; |
| private final int balancerComputeThreads; |
| private final boolean emitBalancingStats; |
| |
| /** |
| * If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources. |
| */ |
| private final boolean killUnusedSegmentsInAllDataSources; |
| |
| /** |
| * List of specific data sources for which kill tasks are sent in {@link KillUnusedSegments}. |
| */ |
| private final Set<String> specificDataSourcesToKillUnusedSegmentsIn; |
| private final Set<String> decommissioningNodes; |
| private final int decommissioningMaxPercentOfMaxSegmentsToMove; |
| |
| /** |
| * Stale pending segments belonging to the data sources in this list are not killed by {@link |
| * KillStalePendingSegments}. In other words, segments in these data sources are "protected". |
| * <p> |
| * Pending segments are considered "stale" when their created_time is older than {@link |
| * KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now. |
| */ |
| private final Set<String> dataSourcesToNotKillStalePendingSegmentsIn; |
| |
| /** |
| * The maximum number of segments that could be queued for loading to any given server. |
| * Default values is 0 with the meaning of "unbounded" (any number of |
| * segments could be added to the loading queue for any server). |
| * See {@link LoadQueuePeon}, {@link org.apache.druid.server.coordinator.rules.LoadRule#run} |
| */ |
| private final int maxSegmentsInNodeLoadingQueue; |
| private final boolean pauseCoordination; |
| |
| /** |
| * This decides whether additional replication is needed for segments that have failed to load due to a load timeout. |
| * When enabled, the coordinator will attempt to replicate the failed segment on a different historical server. |
| * The historical which failed to load the segment may still load the segment later. Therefore, enabling this setting |
| * works better if there are a few slow historicals in the cluster and segment availability needs to be sped up. |
| */ |
| private final boolean replicateAfterLoadTimeout; |
| |
| /** |
| * This is the maximum number of non-primary segment replicants to load per Coordination run. This number can |
| * be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent |
| * long delays in new data loads after events such as a Historical server leaving the cluster. |
| */ |
| private final int maxNonPrimaryReplicantsToLoad; |
| |
| private static final Logger log = new Logger(CoordinatorDynamicConfig.class); |
| |
| @JsonCreator |
| public CoordinatorDynamicConfig( |
| // Keeping the legacy 'millisToWaitBeforeDeleting' property name for backward compatibility. When the project is |
| // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152 |
| @JsonProperty("millisToWaitBeforeDeleting") |
| long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments, |
| @JsonProperty("mergeBytesLimit") long mergeBytesLimit, |
| @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, |
| @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, |
| @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, |
| @JsonProperty("useBatchedSegmentSampler") boolean useBatchedSegmentSampler, |
| @JsonProperty("replicantLifetime") int replicantLifetime, |
| @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, |
| @JsonProperty("balancerComputeThreads") int balancerComputeThreads, |
| @JsonProperty("emitBalancingStats") boolean emitBalancingStats, |
| // Type is Object here so that we can support both string and list as Coordinator console can not send array of |
| // strings in the update request. See https://github.com/apache/druid/issues/3055. |
| // Keeping the legacy 'killDataSourceWhitelist' property name for backward compatibility. When the project is |
| // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152 |
| @JsonProperty("killDataSourceWhitelist") Object specificDataSourcesToKillUnusedSegmentsIn, |
| // Keeping the legacy 'killAllDataSources' property name for backward compatibility. When the project is |
| // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152 |
| @JsonProperty("killAllDataSources") boolean killUnusedSegmentsInAllDataSources, |
| // Type is Object here so that we can support both string and list as Coordinator console can not send array of |
| // strings in the update request, as well as for specificDataSourcesToKillUnusedSegmentsIn. |
| // Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is |
| // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152 |
| @JsonProperty("killPendingSegmentsSkipList") Object dataSourcesToNotKillStalePendingSegmentsIn, |
| @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue, |
| @JsonProperty("decommissioningNodes") Object decommissioningNodes, |
| @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove, |
| @JsonProperty("pauseCoordination") boolean pauseCoordination, |
| @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout, |
| @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad |
| ) |
| { |
| this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = |
| leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; |
| this.mergeBytesLimit = mergeBytesLimit; |
| this.mergeSegmentsLimit = mergeSegmentsLimit; |
| this.maxSegmentsToMove = maxSegmentsToMove; |
| |
| if (percentOfSegmentsToConsiderPerMove == null) { |
| log.debug( |
| "percentOfSegmentsToConsiderPerMove was null! This is likely because your metastore does not " |
| + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value " |
| + "to the Druid default of %f. It is recommended that you re-submit your dynamic config with your " |
| + "desired value for percentOfSegmentsToConsideredPerMove", |
| Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE |
| ); |
| percentOfSegmentsToConsiderPerMove = Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE; |
| } |
| Preconditions.checkArgument( |
| percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100, |
| "percentOfSegmentsToConsiderPerMove should be between 1 and 100!" |
| ); |
| this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; |
| |
| this.useBatchedSegmentSampler = useBatchedSegmentSampler; |
| this.replicantLifetime = replicantLifetime; |
| this.replicationThrottleLimit = replicationThrottleLimit; |
| this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); |
| this.emitBalancingStats = emitBalancingStats; |
| this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources; |
| this.specificDataSourcesToKillUnusedSegmentsIn = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn); |
| this.dataSourcesToNotKillStalePendingSegmentsIn = |
| parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn); |
| this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null |
| ? Builder.DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE |
| : maxSegmentsInNodeLoadingQueue; |
| this.decommissioningNodes = parseJsonStringOrArray(decommissioningNodes); |
| Preconditions.checkArgument( |
| decommissioningMaxPercentOfMaxSegmentsToMove >= 0 && decommissioningMaxPercentOfMaxSegmentsToMove <= 100, |
| "decommissioningMaxPercentOfMaxSegmentsToMove should be in range [0, 100]" |
| ); |
| this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; |
| |
| if (this.killUnusedSegmentsInAllDataSources && !this.specificDataSourcesToKillUnusedSegmentsIn.isEmpty()) { |
| throw new IAE( |
| "can't have killUnusedSegmentsInAllDataSources and non-empty specificDataSourcesToKillUnusedSegmentsIn" |
| ); |
| } |
| this.pauseCoordination = pauseCoordination; |
| this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; |
| |
| if (maxNonPrimaryReplicantsToLoad == null) { |
| log.debug( |
| "maxNonPrimaryReplicantsToLoad was null! This is likely because your metastore does not " |
| + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value " |
| + "to the Druid default of %d. It is recommended that you re-submit your dynamic config with your " |
| + "desired value for maxNonPrimaryReplicantsToLoad", |
| Builder.DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD |
| ); |
| maxNonPrimaryReplicantsToLoad = Builder.DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD; |
| } |
| Preconditions.checkArgument( |
| maxNonPrimaryReplicantsToLoad >= 0, |
| "maxNonPrimaryReplicantsToLoad must be greater than or equal to 0." |
| ); |
| this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; |
| } |
| |
| private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray) |
| { |
| if (jsonStringOrArray instanceof String) { |
| String[] list = ((String) jsonStringOrArray).split(","); |
| Set<String> result = new HashSet<>(); |
| for (String item : list) { |
| String trimmed = item.trim(); |
| if (!trimmed.isEmpty()) { |
| result.add(trimmed); |
| } |
| } |
| return result; |
| } else if (jsonStringOrArray instanceof Collection) { |
| return ImmutableSet.copyOf(((Collection) jsonStringOrArray)); |
| } else { |
| return ImmutableSet.of(); |
| } |
| } |
| |
| public static AtomicReference<CoordinatorDynamicConfig> watch(final JacksonConfigManager configManager) |
| { |
| return configManager.watch( |
| CoordinatorDynamicConfig.CONFIG_KEY, |
| CoordinatorDynamicConfig.class, |
| CoordinatorDynamicConfig.builder().build() |
| ); |
| } |
| |
| @Nonnull |
| public static CoordinatorDynamicConfig current(final JacksonConfigManager configManager) |
| { |
| return Preconditions.checkNotNull(watch(configManager).get(), "Got null config from watcher?!"); |
| } |
| |
| @JsonProperty("millisToWaitBeforeDeleting") |
| public long getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments() |
| { |
| return leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; |
| } |
| |
| @JsonProperty |
| public long getMergeBytesLimit() |
| { |
| return mergeBytesLimit; |
| } |
| |
| @JsonProperty |
| public boolean emitBalancingStats() |
| { |
| return emitBalancingStats; |
| } |
| |
| @JsonProperty |
| public int getMergeSegmentsLimit() |
| { |
| return mergeSegmentsLimit; |
| } |
| |
| @JsonProperty |
| public int getMaxSegmentsToMove() |
| { |
| return maxSegmentsToMove; |
| } |
| |
| @JsonProperty |
| public double getPercentOfSegmentsToConsiderPerMove() |
| { |
| return percentOfSegmentsToConsiderPerMove; |
| } |
| |
| @JsonProperty |
| public boolean useBatchedSegmentSampler() |
| { |
| return useBatchedSegmentSampler; |
| } |
| |
| @JsonProperty |
| public int getReplicantLifetime() |
| { |
| return replicantLifetime; |
| } |
| |
| @JsonProperty |
| public int getReplicationThrottleLimit() |
| { |
| return replicationThrottleLimit; |
| } |
| |
| @JsonProperty |
| public int getBalancerComputeThreads() |
| { |
| return balancerComputeThreads; |
| } |
| |
| @JsonProperty("killDataSourceWhitelist") |
| public Set<String> getSpecificDataSourcesToKillUnusedSegmentsIn() |
| { |
| return specificDataSourcesToKillUnusedSegmentsIn; |
| } |
| |
| @JsonProperty("killAllDataSources") |
| public boolean isKillUnusedSegmentsInAllDataSources() |
| { |
| return killUnusedSegmentsInAllDataSources; |
| } |
| |
| @JsonProperty("killPendingSegmentsSkipList") |
| public Set<String> getDataSourcesToNotKillStalePendingSegmentsIn() |
| { |
| return dataSourcesToNotKillStalePendingSegmentsIn; |
| } |
| |
| @JsonProperty |
| public int getMaxSegmentsInNodeLoadingQueue() |
| { |
| return maxSegmentsInNodeLoadingQueue; |
| } |
| |
| /** |
| * List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' |
| * servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate |
| * specified by {@link CoordinatorDynamicConfig#getDecommissioningMaxPercentOfMaxSegmentsToMove}. |
| * |
| * @return list of host:port entries |
| */ |
| @JsonProperty |
| public Set<String> getDecommissioningNodes() |
| { |
| return decommissioningNodes; |
| |
| } |
| |
| /** |
| * The percent of {@link CoordinatorDynamicConfig#getMaxSegmentsToMove()} that determines the maximum number of |
| * segments that may be moved away from 'decommissioning' servers (specified by |
| * {@link CoordinatorDynamicConfig#getDecommissioningNodes()}) to non-decommissioning servers during one Coordinator |
| * balancer run. If this value is 0, segments will neither be moved from or to 'decommissioning' servers, effectively |
| * putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. |
| * Decommissioning can also become stalled if there are no available active servers to place the segments. By |
| * adjusting this value, an operator can prevent active servers from overload by prioritizing balancing, or |
| * decrease decommissioning time instead. |
| * |
| * @return number in range [0, 100] |
| */ |
| @Min(0) |
| @Max(100) |
| @JsonProperty |
| public int getDecommissioningMaxPercentOfMaxSegmentsToMove() |
| { |
| return decommissioningMaxPercentOfMaxSegmentsToMove; |
| } |
| |
| @JsonProperty |
| public boolean getPauseCoordination() |
| { |
| return pauseCoordination; |
| } |
| |
| @JsonProperty |
| public boolean getReplicateAfterLoadTimeout() |
| { |
| return replicateAfterLoadTimeout; |
| } |
| |
| @Min(0) |
| @JsonProperty |
| public int getMaxNonPrimaryReplicantsToLoad() |
| { |
| return maxNonPrimaryReplicantsToLoad; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "CoordinatorDynamicConfig{" + |
| "leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments=" |
| + leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments + |
| ", mergeBytesLimit=" + mergeBytesLimit + |
| ", mergeSegmentsLimit=" + mergeSegmentsLimit + |
| ", maxSegmentsToMove=" + maxSegmentsToMove + |
| ", percentOfSegmentsToConsiderPerMove=" + percentOfSegmentsToConsiderPerMove + |
| ", useBatchedSegmentSampler=" + useBatchedSegmentSampler + |
| ", replicantLifetime=" + replicantLifetime + |
| ", replicationThrottleLimit=" + replicationThrottleLimit + |
| ", balancerComputeThreads=" + balancerComputeThreads + |
| ", emitBalancingStats=" + emitBalancingStats + |
| ", killUnusedSegmentsInAllDataSources=" + killUnusedSegmentsInAllDataSources + |
| ", specificDataSourcesToKillUnusedSegmentsIn=" + specificDataSourcesToKillUnusedSegmentsIn + |
| ", dataSourcesToNotKillStalePendingSegmentsIn=" + dataSourcesToNotKillStalePendingSegmentsIn + |
| ", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue + |
| ", decommissioningNodes=" + decommissioningNodes + |
| ", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove + |
| ", pauseCoordination=" + pauseCoordination + |
| ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + |
| ", maxNonPrimaryReplicantsToLoad=" + maxNonPrimaryReplicantsToLoad + |
| '}'; |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| |
| CoordinatorDynamicConfig that = (CoordinatorDynamicConfig) o; |
| |
| if (leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments != |
| that.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments) { |
| return false; |
| } |
| if (mergeBytesLimit != that.mergeBytesLimit) { |
| return false; |
| } |
| if (mergeSegmentsLimit != that.mergeSegmentsLimit) { |
| return false; |
| } |
| if (maxSegmentsToMove != that.maxSegmentsToMove) { |
| return false; |
| } |
| if (percentOfSegmentsToConsiderPerMove != that.percentOfSegmentsToConsiderPerMove) { |
| return false; |
| } |
| if (useBatchedSegmentSampler != that.useBatchedSegmentSampler) { |
| return false; |
| } |
| if (replicantLifetime != that.replicantLifetime) { |
| return false; |
| } |
| if (replicationThrottleLimit != that.replicationThrottleLimit) { |
| return false; |
| } |
| if (balancerComputeThreads != that.balancerComputeThreads) { |
| return false; |
| } |
| if (emitBalancingStats != that.emitBalancingStats) { |
| return false; |
| } |
| if (killUnusedSegmentsInAllDataSources != that.killUnusedSegmentsInAllDataSources) { |
| return false; |
| } |
| if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) { |
| return false; |
| } |
| if (!Objects.equals(specificDataSourcesToKillUnusedSegmentsIn, that.specificDataSourcesToKillUnusedSegmentsIn)) { |
| return false; |
| } |
| if (!Objects.equals(dataSourcesToNotKillStalePendingSegmentsIn, that.dataSourcesToNotKillStalePendingSegmentsIn)) { |
| return false; |
| } |
| if (!Objects.equals(decommissioningNodes, that.decommissioningNodes)) { |
| return false; |
| } |
| if (pauseCoordination != that.pauseCoordination) { |
| return false; |
| } |
| if (replicateAfterLoadTimeout != that.replicateAfterLoadTimeout) { |
| return false; |
| } |
| if (maxNonPrimaryReplicantsToLoad != that.maxNonPrimaryReplicantsToLoad) { |
| return false; |
| } |
| return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove; |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash( |
| leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments, |
| mergeBytesLimit, |
| mergeSegmentsLimit, |
| maxSegmentsToMove, |
| percentOfSegmentsToConsiderPerMove, |
| useBatchedSegmentSampler, |
| replicantLifetime, |
| replicationThrottleLimit, |
| balancerComputeThreads, |
| emitBalancingStats, |
| killUnusedSegmentsInAllDataSources, |
| maxSegmentsInNodeLoadingQueue, |
| specificDataSourcesToKillUnusedSegmentsIn, |
| dataSourcesToNotKillStalePendingSegmentsIn, |
| decommissioningNodes, |
| decommissioningMaxPercentOfMaxSegmentsToMove, |
| pauseCoordination, |
| maxNonPrimaryReplicantsToLoad |
| ); |
| } |
| |
| public static Builder builder() |
| { |
| return new Builder(); |
| } |
| |
| public static class Builder |
| { |
| private static final long DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS = |
| TimeUnit.MINUTES.toMillis(15); |
| private static final long DEFAULT_MERGE_BYTES_LIMIT = 524_288_000L; |
| private static final int DEFAULT_MERGE_SEGMENTS_LIMIT = 100; |
| private static final int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5; |
| private static final double DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE = 100; |
| private static final int DEFAULT_REPLICANT_LIFETIME = 15; |
| private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10; |
| private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1; |
| private static final boolean DEFAULT_EMIT_BALANCING_STATS = false; |
| private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false; |
| private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false; |
| private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100; |
| private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; |
| private static final boolean DEFAULT_PAUSE_COORDINATION = false; |
| private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false; |
| private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE; |
| |
| private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; |
| private Long mergeBytesLimit; |
| private Integer mergeSegmentsLimit; |
| private Integer maxSegmentsToMove; |
| private Double percentOfSegmentsToConsiderPerMove; |
| private Boolean useBatchedSegmentSampler; |
| private Integer replicantLifetime; |
| private Integer replicationThrottleLimit; |
| private Boolean emitBalancingStats; |
| private Integer balancerComputeThreads; |
| private Object specificDataSourcesToKillUnusedSegmentsIn; |
| private Boolean killUnusedSegmentsInAllDataSources; |
| private Object dataSourcesToNotKillStalePendingSegmentsIn; |
| private Integer maxSegmentsInNodeLoadingQueue; |
| private Object decommissioningNodes; |
| private Integer decommissioningMaxPercentOfMaxSegmentsToMove; |
| private Boolean pauseCoordination; |
| private Boolean replicateAfterLoadTimeout; |
| private Integer maxNonPrimaryReplicantsToLoad; |
| |
| public Builder() |
| { |
| } |
| |
| @JsonCreator |
| public Builder( |
| @JsonProperty("millisToWaitBeforeDeleting") |
| @Nullable Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments, |
| @JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit, |
| @JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit, |
| @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove, |
| @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, |
| @JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler, |
| @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime, |
| @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, |
| @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, |
| @JsonProperty("emitBalancingStats") @Nullable Boolean emitBalancingStats, |
| @JsonProperty("killDataSourceWhitelist") @Nullable Object specificDataSourcesToKillUnusedSegmentsIn, |
| @JsonProperty("killAllDataSources") @Nullable Boolean killUnusedSegmentsInAllDataSources, |
| @JsonProperty("killPendingSegmentsSkipList") @Nullable Object dataSourcesToNotKillStalePendingSegmentsIn, |
| @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue, |
| @JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes, |
| @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") |
| @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove, |
| @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination, |
| @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout, |
| @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad |
| ) |
| { |
| this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = |
| leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; |
| this.mergeBytesLimit = mergeBytesLimit; |
| this.mergeSegmentsLimit = mergeSegmentsLimit; |
| this.maxSegmentsToMove = maxSegmentsToMove; |
| this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; |
| this.useBatchedSegmentSampler = useBatchedSegmentSampler; |
| this.replicantLifetime = replicantLifetime; |
| this.replicationThrottleLimit = replicationThrottleLimit; |
| this.balancerComputeThreads = balancerComputeThreads; |
| this.emitBalancingStats = emitBalancingStats; |
| this.specificDataSourcesToKillUnusedSegmentsIn = specificDataSourcesToKillUnusedSegmentsIn; |
| this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources; |
| this.dataSourcesToNotKillStalePendingSegmentsIn = dataSourcesToNotKillStalePendingSegmentsIn; |
| this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; |
| this.decommissioningNodes = decommissioningNodes; |
| this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; |
| this.pauseCoordination = pauseCoordination; |
| this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; |
| this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; |
| } |
| |
| public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis) |
| { |
| this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = leadingTimeMillis; |
| return this; |
| } |
| |
| public Builder withMergeBytesLimit(long mergeBytesLimit) |
| { |
| this.mergeBytesLimit = mergeBytesLimit; |
| return this; |
| } |
| |
| public Builder withMergeSegmentsLimit(int mergeSegmentsLimit) |
| { |
| this.mergeSegmentsLimit = mergeSegmentsLimit; |
| return this; |
| } |
| |
| public Builder withMaxSegmentsToMove(int maxSegmentsToMove) |
| { |
| this.maxSegmentsToMove = maxSegmentsToMove; |
| return this; |
| } |
| |
| public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsToConsiderPerMove) |
| { |
| this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; |
| return this; |
| } |
| |
| public Builder withUseBatchedSegmentSampler(boolean useBatchedSegmentSampler) |
| { |
| this.useBatchedSegmentSampler = useBatchedSegmentSampler; |
| return this; |
| } |
| |
| public Builder withReplicantLifetime(int replicantLifetime) |
| { |
| this.replicantLifetime = replicantLifetime; |
| return this; |
| } |
| |
| public Builder withReplicationThrottleLimit(int replicationThrottleLimit) |
| { |
| this.replicationThrottleLimit = replicationThrottleLimit; |
| return this; |
| } |
| |
| public Builder withBalancerComputeThreads(int balancerComputeThreads) |
| { |
| this.balancerComputeThreads = balancerComputeThreads; |
| return this; |
| } |
| |
| public Builder withEmitBalancingStats(boolean emitBalancingStats) |
| { |
| this.emitBalancingStats = emitBalancingStats; |
| return this; |
| } |
| |
| public Builder withSpecificDataSourcesToKillUnusedSegmentsIn(Set<String> dataSources) |
| { |
| this.specificDataSourcesToKillUnusedSegmentsIn = dataSources; |
| return this; |
| } |
| |
| public Builder withKillUnusedSegmentsInAllDataSources(boolean killUnusedSegmentsInAllDataSources) |
| { |
| this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources; |
| return this; |
| } |
| |
| public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQueue) |
| { |
| this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; |
| return this; |
| } |
| |
| public Builder withDecommissioningNodes(Set<String> decommissioning) |
| { |
| this.decommissioningNodes = decommissioning; |
| return this; |
| } |
| |
| public Builder withDecommissioningMaxPercentOfMaxSegmentsToMove(Integer percent) |
| { |
| this.decommissioningMaxPercentOfMaxSegmentsToMove = percent; |
| return this; |
| } |
| |
| public Builder withPauseCoordination(boolean pauseCoordination) |
| { |
| this.pauseCoordination = pauseCoordination; |
| return this; |
| } |
| |
| public Builder withReplicateAfterLoadTimeout(boolean replicateAfterLoadTimeout) |
| { |
| this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; |
| return this; |
| } |
| |
| public Builder withMaxNonPrimaryReplicantsToLoad(int maxNonPrimaryReplicantsToLoad) |
| { |
| this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; |
| return this; |
| } |
| |
| public CoordinatorDynamicConfig build() |
| { |
| return new CoordinatorDynamicConfig( |
| leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null |
| ? DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS |
| : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments, |
| mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : mergeBytesLimit, |
| mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : mergeSegmentsLimit, |
| maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove, |
| percentOfSegmentsToConsiderPerMove == null ? DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE |
| : percentOfSegmentsToConsiderPerMove, |
| useBatchedSegmentSampler == null ? DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler, |
| replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime, |
| replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit, |
| balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads, |
| emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : emitBalancingStats, |
| specificDataSourcesToKillUnusedSegmentsIn, |
| killUnusedSegmentsInAllDataSources == null |
| ? DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES |
| : killUnusedSegmentsInAllDataSources, |
| dataSourcesToNotKillStalePendingSegmentsIn, |
| maxSegmentsInNodeLoadingQueue == null |
| ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE |
| : maxSegmentsInNodeLoadingQueue, |
| decommissioningNodes, |
| decommissioningMaxPercentOfMaxSegmentsToMove == null |
| ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT |
| : decommissioningMaxPercentOfMaxSegmentsToMove, |
| pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination, |
| replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout, |
| maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD |
| : maxNonPrimaryReplicantsToLoad |
| ); |
| } |
| |
| public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) |
| { |
| return new CoordinatorDynamicConfig( |
| leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null |
| ? defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments() |
| : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments, |
| mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit, |
| mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit, |
| maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove, |
| percentOfSegmentsToConsiderPerMove == null |
| ? defaults.getPercentOfSegmentsToConsiderPerMove() |
| : percentOfSegmentsToConsiderPerMove, |
| useBatchedSegmentSampler == null ? defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler, |
| replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime, |
| replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit, |
| balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads, |
| emitBalancingStats == null ? defaults.emitBalancingStats() : emitBalancingStats, |
| specificDataSourcesToKillUnusedSegmentsIn == null |
| ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn() |
| : specificDataSourcesToKillUnusedSegmentsIn, |
| killUnusedSegmentsInAllDataSources == null |
| ? defaults.isKillUnusedSegmentsInAllDataSources() |
| : killUnusedSegmentsInAllDataSources, |
| dataSourcesToNotKillStalePendingSegmentsIn == null |
| ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn() |
| : dataSourcesToNotKillStalePendingSegmentsIn, |
| maxSegmentsInNodeLoadingQueue == null |
| ? defaults.getMaxSegmentsInNodeLoadingQueue() |
| : maxSegmentsInNodeLoadingQueue, |
| decommissioningNodes == null ? defaults.getDecommissioningNodes() : decommissioningNodes, |
| decommissioningMaxPercentOfMaxSegmentsToMove == null |
| ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove() |
| : decommissioningMaxPercentOfMaxSegmentsToMove, |
| pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination, |
| replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout, |
| maxNonPrimaryReplicantsToLoad == null |
| ? defaults.getMaxNonPrimaryReplicantsToLoad() |
| : maxNonPrimaryReplicantsToLoad |
| ); |
| } |
| } |
| } |