blob: 76b11040aad9106574489173e9cf344e31f15e78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ImmutableDruidServerTests;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class BalanceSegmentsTest
{
private static final int MAX_SEGMENTS_TO_MOVE = 5;
private DruidCoordinator coordinator;
private ImmutableDruidServer druidServer1;
private ImmutableDruidServer druidServer2;
private ImmutableDruidServer druidServer3;
private ImmutableDruidServer druidServer4;
private List<ImmutableDruidServer> druidServers;
private LoadQueuePeonTester peon1;
private LoadQueuePeonTester peon2;
private LoadQueuePeonTester peon3;
private LoadQueuePeonTester peon4;
private List<LoadQueuePeon> peons;
private DataSegment segment1;
private DataSegment segment2;
private DataSegment segment3;
private DataSegment segment4;
private DataSegment segment5;
private List<DataSegment> segments;
private ListeningExecutorService balancerStrategyExecutor;
private BalancerStrategy balancerStrategy;
private Set<String> broadcastDatasources;
@Before
public void setUp()
{
coordinator = EasyMock.createMock(DruidCoordinator.class);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer3 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer4 = EasyMock.createMock(ImmutableDruidServer.class);
segment1 = EasyMock.createMock(DataSegment.class);
segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class);
segment4 = EasyMock.createMock(DataSegment.class);
segment5 = EasyMock.createMock(DataSegment.class);
DateTime start1 = DateTimes.of("2012-01-01");
DateTime start2 = DateTimes.of("2012-02-01");
DateTime version = DateTimes.of("2012-03-01");
segment1 = new DataSegment(
"datasource1",
new Interval(start1, start1.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
11L
);
segment2 = new DataSegment(
"datasource1",
new Interval(start2, start2.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
7L
);
segment3 = new DataSegment(
"datasource2",
new Interval(start1, start1.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
4L
);
segment4 = new DataSegment(
"datasource2",
new Interval(start2, start2.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
8L
);
segment5 = new DataSegment(
"datasourceBroadcast",
new Interval(start2, start2.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
8L
);
segments = new ArrayList<>();
segments.add(segment1);
segments.add(segment2);
segments.add(segment3);
segments.add(segment4);
segments.add(segment5);
peon1 = new LoadQueuePeonTester();
peon2 = new LoadQueuePeonTester();
peon3 = new LoadQueuePeonTester();
peon4 = new LoadQueuePeonTester();
druidServers = ImmutableList.of(druidServer1, druidServer2, druidServer3, druidServer4);
peons = ImmutableList.of(peon1, peon2, peon3, peon4);
balancerStrategyExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
broadcastDatasources = Collections.singleton("datasourceBroadcast");
}
@After
public void tearDown()
{
EasyMock.verify(coordinator);
EasyMock.verify(druidServer1);
EasyMock.verify(druidServer2);
EasyMock.verify(druidServer3);
EasyMock.verify(druidServer4);
balancerStrategyExecutor.shutdownNow();
}
@Test
public void testMoveToEmptyServerBalancer()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy(
balancerStrategy,
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
new BalancerSegmentHolder(druidServer1, segment2),
new BalancerSegmentHolder(druidServer1, segment3),
new BalancerSegmentHolder(druidServer1, segment4)
)
);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(3, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
/**
* Server 1 has 2 segments.
* Server 2 (decommissioning) has 2 segments.
* Server 3 is empty.
* Decommissioning percent is 60.
* Max segments to move is 3.
* 2 (of 2) segments should be moved from Server 2 and 1 (of 2) from Server 1.
*/
@Test
public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, Arrays.asList(segment1, segment2));
mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment3, segment4));
mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer4);
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(
strategy.pickSegmentsToMove(
ImmutableList.of(
new ServerHolder(druidServer2, peon2, false)
),
broadcastDatasources,
1,
100
)
).andReturn(
ImmutableList.of(
new BalancerSegmentHolder(druidServer2, segment3),
new BalancerSegmentHolder(druidServer2, segment4)
).iterator()
);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
.andReturn(
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
new BalancerSegmentHolder(druidServer1, segment2)).iterator());
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))
.anyTimes();
EasyMock.replay(strategy);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2, druidServer3),
ImmutableList.of(peon1, peon2, peon3),
ImmutableList.of(false, true, false)
)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(3)
.withDecommissioningMaxPercentOfMaxSegmentsToMove(60)
.build() // ceil(3 * 0.6) = 2 segments from decommissioning servers
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertThat(
peon3.getSegmentsToLoad(),
Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1, segment3, segment4)))
);
}
@Test
public void testZeroDecommissioningMaxPercentOfMaxSegmentsToMove()
{
DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(0);
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertThat(peon3.getSegmentsToLoad(), Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1))));
}
@Test
public void testMaxDecommissioningMaxPercentOfMaxSegmentsToMove()
{
DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10);
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertThat(peon3.getSegmentsToLoad(), Matchers.is(Matchers.equalTo(ImmutableSet.of(segment2))));
}
/**
* Should balance segments as usual (ignoring percent) with empty decommissioningNodes.
*/
@Test
public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, Arrays.asList(segment1, segment2));
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Arrays.asList(segment3, segment4));
mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer4);
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator());
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
.andReturn(
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment2),
new BalancerSegmentHolder(druidServer2, segment3),
new BalancerSegmentHolder(druidServer2, segment4)).iterator());
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))
.anyTimes();
EasyMock.replay(strategy);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2, druidServer3),
ImmutableList.of(peon1, peon2, peon3),
ImmutableList.of(false, false, false)
)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(3)
.withDecommissioningMaxPercentOfMaxSegmentsToMove(9)
.build()
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertThat(
peon3.getSegmentsToLoad(),
Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1, segment2, segment3)))
);
}
/**
* Shouldn't move segments to a decommissioning server.
*/
@Test
public void testMoveToDecommissioningServer()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator())
.anyTimes();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> {
List<ServerHolder> holders = (List<ServerHolder>) EasyMock.getCurrentArguments()[1];
return holders.get(0);
}).anyTimes();
EasyMock.replay(strategy);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2),
ImmutableList.of(false, true)
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(0, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
@Test
public void testMoveFromDecommissioningServer()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
mockCoordinator(coordinator);
ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator())
.once();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(holder2)
.once();
EasyMock.replay(strategy);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2),
ImmutableList.of(true, false)
)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(1).build())
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertEquals(0, peon1.getNumberOfSegmentsInQueue());
Assert.assertEquals(1, peon2.getNumberOfSegmentsInQueue());
}
@Test
public void testMoveMaxLoadQueueServerBalancer()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy(
balancerStrategy,
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
new BalancerSegmentHolder(druidServer1, segment2),
new BalancerSegmentHolder(druidServer1, segment3),
new BalancerSegmentHolder(druidServer1, segment4)
)
);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withBroadcastDatasources(broadcastDatasources)
.withDynamicConfigs(
CoordinatorDynamicConfig
.builder()
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.withMaxSegmentsInNodeLoadingQueue(1)
.build()
)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
// max to move is 5, all segments on server 1, but only expect to move 1 to server 2 since max node load queue is 1
Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
@Test
public void testMoveSameSegmentTwice()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy(
balancerStrategy,
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1)
)
);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withBroadcastDatasources(broadcastDatasources)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
2
).build()
)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
@Test
public void testRun1()
{
// Mock some servers of different usages
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
).build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
}
@Test
public void testRun2()
{
// Mock some servers of different usages
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList());
mockDruidServer(druidServer4, "4", "normal", 0L, 100L, Collections.emptyList());
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(druidServers, peons).build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
}
/**
* Testing that the dynamic coordinator config value, percentOfSegmentsToConsiderPerMove, is honored when calling
* out to pickSegmentToMove. This config limits the number of segments that are considered when looking for a segment
* to move.
*/
@Test
public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove()
{
mockDruidServer(druidServer1, "1", "normal", 50L, 100L, Arrays.asList(segment1, segment2));
mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment3, segment4));
mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer4);
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
// The first call for decommissioning servers
EasyMock.expect(
strategy.pickSegmentsToMove(
ImmutableList.of(),
broadcastDatasources,
1,
40
)
)
.andReturn(Collections.emptyIterator());
// The second call for the single non decommissioning server move
EasyMock.expect(
strategy.pickSegmentsToMove(
ImmutableList.of(
new ServerHolder(druidServer3, peon3, false),
new ServerHolder(druidServer2, peon2, false),
new ServerHolder(druidServer1, peon1, false)
),
broadcastDatasources,
1,
40
)
)
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment3)).iterator());
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))
.anyTimes();
EasyMock.replay(strategy);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2, druidServer3),
ImmutableList.of(peon1, peon2, peon3),
ImmutableList.of(false, false, false)
)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(1)
.withPercentOfSegmentsToConsiderPerMove(40)
.build()
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertThat(
peon3.getSegmentsToLoad(),
Matchers.is(Matchers.equalTo(ImmutableSet.of(segment3)))
);
}
@Test
public void testUseBatchedSegmentSampler()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList());
mockDruidServer(druidServer4, "4", "normal", 0L, 100L, Collections.emptyList());
mockCoordinator(coordinator);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(druidServers, peons)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(2)
.withUseBatchedSegmentSampler(true)
.build()
)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(2L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
List<ImmutableDruidServer> druidServers,
List<LoadQueuePeon> peons
)
{
return defaultRuntimeParamsBuilder(
druidServers,
peons,
druidServers.stream().map(s -> false).collect(Collectors.toList())
);
}
private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
List<ImmutableDruidServer> druidServers,
List<LoadQueuePeon> peons,
List<Boolean> decommissioning
)
{
return CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withDruidCluster(
DruidClusterBuilder
.newBuilder()
.addTier(
"normal",
IntStream
.range(0, druidServers.size())
.mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i), decommissioning.get(i)))
.toArray(ServerHolder[]::new)
)
.build()
)
.withLoadManagementPeons(
IntStream
.range(0, peons.size())
.boxed()
.collect(Collectors.toMap(i -> String.valueOf(i + 1), peons::get))
)
.withUsedSegmentsInTest(segments)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBroadcastDatasources(broadcastDatasources)
.withBalancerStrategy(balancerStrategy);
}
private static void mockDruidServer(
ImmutableDruidServer druidServer,
String name,
String tier,
long currentSize,
long maxSize,
List<DataSegment> segments
)
{
EasyMock.expect(druidServer.getName()).andReturn(name).anyTimes();
EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce();
EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce();
ImmutableDruidServerTests.expectSegments(druidServer, segments);
EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes();
EasyMock.expect(druidServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes();
if (!segments.isEmpty()) {
segments.forEach(
s -> EasyMock.expect(druidServer.getSegment(s.getId())).andReturn(s).anyTimes()
);
}
EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer);
}
private static void mockCoordinator(DruidCoordinator coordinator)
{
coordinator.moveSegment(
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
}
private static class PredefinedPickOrderBalancerStrategy implements BalancerStrategy
{
private final BalancerStrategy delegate;
private final List<BalancerSegmentHolder> pickOrder;
private final AtomicInteger pickCounter = new AtomicInteger(0);
PredefinedPickOrderBalancerStrategy(
BalancerStrategy delegate,
List<BalancerSegmentHolder> pickOrder
)
{
this.delegate = delegate;
this.pickOrder = pickOrder;
}
@Override
public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List<ServerHolder> serverHolders)
{
return delegate.findNewSegmentHomeBalancer(proposalSegment, serverHolders);
}
@Override
public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List<ServerHolder> serverHolders)
{
return delegate.findNewSegmentHomeReplicator(proposalSegment, serverHolders);
}
@Override
public Iterator<BalancerSegmentHolder> pickSegmentsToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
int numberOfSegments,
double percentOfSegmentsToConsider
)
{
return pickOrder.iterator();
}
@Override
public void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList)
{
delegate.emitStats(tier, stats, serverHolderList);
}
}
private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(int percent)
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, Arrays.asList(segment1, segment3));
mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment2, segment3));
mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer4);
mockCoordinator(coordinator);
// either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3])
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(
strategy.pickSegmentsToMove(
ImmutableList.of(
new ServerHolder(druidServer2, peon2, true)
),
broadcastDatasources,
1,
100
)
).andReturn(
ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment2)).iterator()
);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyDouble()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator());
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))
.anyTimes();
EasyMock.replay(strategy);
return defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2, druidServer3),
ImmutableList.of(peon1, peon2, peon3),
ImmutableList.of(false, true, false)
)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(1)
.withDecommissioningMaxPercentOfMaxSegmentsToMove(percent)
.build()
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
}
}