blob: 751f2597e6b379db614c0f64e55ae10865401569 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
final class ReservoirSegmentSampler
{
private static final EmittingLogger log = new EmittingLogger(ReservoirSegmentSampler.class);
static List<BalancerSegmentHolder> getRandomBalancerSegmentHolders(
final List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
int k
)
{
List<BalancerSegmentHolder> holders = new ArrayList<>(k);
int numSoFar = 0;
for (ServerHolder server : serverHolders) {
if (!server.getServer().getType().isSegmentReplicationTarget()) {
// if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do
continue;
}
for (DataSegment segment : server.getServer().iterateAllSegments()) {
if (broadcastDatasources.contains(segment.getDataSource())) {
// we don't need to rebalance segments that were assigned via broadcast rules
continue;
}
if (numSoFar < k) {
holders.add(new BalancerSegmentHolder(server.getServer(), segment));
numSoFar++;
continue;
}
int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1);
if (randNum < k) {
holders.set(randNum, new BalancerSegmentHolder(server.getServer(), segment));
}
numSoFar++;
}
}
return holders;
}
/**
* Iterates over segments that live on the candidate servers passed in {@link ServerHolder} and (possibly) picks a
* segment to return to caller in a {@link BalancerSegmentHolder} object.
*
* @param serverHolders List of {@link ServerHolder} objects containing segments who are candidates to be chosen.
* @param broadcastDatasources Set of DataSource names that identify broadcast datasources. We don't want to consider
* segments from these datasources.
* @param percentOfSegmentsToConsider The % of total cluster segments to consider before short-circuiting and
* returning immediately.
* @return
*/
@Deprecated
static BalancerSegmentHolder getRandomBalancerSegmentHolder(
final List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int calculatedSegmentLimit = Integer.MAX_VALUE;
int numSoFar = 0;
// Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration less than or equal to
// 0% of segments or greater than 100% of segments.
if (percentOfSegmentsToConsider <= 0 || percentOfSegmentsToConsider > 100) {
log.warn("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed."
+ " You Provided [%f]", percentOfSegmentsToConsider);
percentOfSegmentsToConsider = 100;
}
// Calculate the integer limit for the number of segments to be considered for moving if % is less than 100
if (percentOfSegmentsToConsider < 100) {
int totalSegments = 0;
for (ServerHolder server : serverHolders) {
totalSegments += server.getServer().getNumSegments();
}
// If totalSegments are zero, we will assume it is a mistake and move on to iteration without updating
// calculatedSegmentLimit
if (totalSegments != 0) {
calculatedSegmentLimit = (int) Math.ceil((double) totalSegments * (percentOfSegmentsToConsider / 100.0));
} else {
log.warn("Unable to calculate limit on segments to consider because ServerHolder collection indicates"
+ " zero segments existing in the cluster.");
}
}
for (ServerHolder server : serverHolders) {
if (!server.getServer().getType().isSegmentReplicationTarget()) {
// if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do
continue;
}
for (DataSegment segment : server.getServer().iterateAllSegments()) {
if (broadcastDatasources.contains(segment.getDataSource())) {
// we don't need to rebalance segments that were assigned via broadcast rules
continue;
}
int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar+1), swap out the server and segment
if (randNum == numSoFar) {
fromServerHolder = server;
proposalSegment = segment;
}
numSoFar++;
// We have iterated over the alloted number of segments and will return the currently proposed segment or null
// We will only break out early if we are iterating less than 100% of the total cluster segments
if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) {
log.debug("Breaking out of iteration over potential segments to move because we hit the limit [%f percent] of"
+ " segments to consider to move. Segments Iterated: [%d]", percentOfSegmentsToConsider, numSoFar);
break;
}
}
// We have iterated over the alloted number of segments and will return the currently proposed segment or null
// We will only break out early if we are iterating less than 100% of the total cluster segments
if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) {
break;
}
}
if (fromServerHolder != null) {
return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
} else {
return null;
}
}
private ReservoirSegmentSampler()
{
}
}