blob: 2a96bf5b5a4a58868075a11f13ff438bea276897 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.SingleServerInventoryView;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class DruidCoordinatorTest extends CuratorTestBase
{
private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
private static final long COORDINATOR_START_DELAY = 1;
private static final long COORDINATOR_PERIOD = 100;
private DruidCoordinator coordinator;
private SegmentsMetadataManager segmentsMetadataManager;
private DataSourcesSnapshot dataSourcesSnapshot;
private DruidCoordinatorRuntimeParams coordinatorRuntimeParams;
private SingleServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory;
private DruidServer druidServer;
private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
private LoadQueuePeon loadQueuePeon;
private MetadataRuleManager metadataRuleManager;
private CountDownLatch leaderAnnouncerLatch;
private CountDownLatch leaderUnannouncerLatch;
private PathChildrenCache pathChildrenCache;
private DruidCoordinatorConfig druidCoordinatorConfig;
private ObjectMapper objectMapper;
private DruidNode druidNode;
private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
@Before
public void setUp() throws Exception
{
druidServer = EasyMock.createMock(DruidServer.class);
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference(CoordinatorDynamicConfig.builder().build())).anyTimes();
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference(CoordinatorCompactionConfig.empty())).anyTimes();
EasyMock.replay(configManager);
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
curator.create().creatingParentsIfNeeded().forPath(LOADPATH);
objectMapper = new DefaultObjectMapper();
druidCoordinatorConfig = new TestDruidCoordinatorConfig(
new Duration(COORDINATOR_START_DELAY),
new Duration(COORDINATOR_PERIOD),
null,
null,
null,
new Duration(COORDINATOR_PERIOD),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
10,
new Duration("PT0s")
);
pathChildrenCache = new PathChildrenCache(
curator,
LOADPATH,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache-%d")
);
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOADPATH,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon-%d"),
druidCoordinatorConfig
);
loadQueuePeon.start();
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
loadManagementPeons = new ConcurrentHashMap<>();
scheduledExecutorFactory = new ScheduledExecutorFactory()
{
@Override
public ScheduledExecutorService create(int corePoolSize, final String nameFormat)
{
return Executors.newSingleThreadScheduledExecutor();
}
};
leaderAnnouncerLatch = new CountDownLatch(1);
leaderUnannouncerLatch = new CountDownLatch(1);
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
new ZkPathsConfig()
{
@Override
public String getBase()
{
return "druid";
}
},
configManager,
segmentsMetadataManager,
serverInventoryView,
metadataRuleManager,
() -> curator,
serviceEmitter,
scheduledExecutorFactory,
null,
null,
new NoopServiceAnnouncer()
{
@Override
public void announce(DruidNode node)
{
// count down when this coordinator becomes the leader
leaderAnnouncerLatch.countDown();
}
@Override
public void unannounce(DruidNode node)
{
leaderUnannouncerLatch.countDown();
}
},
druidNode,
loadManagementPeons,
null,
new HashSet<>(),
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
ZkEnablementConfig.ENABLED
);
}
@After
public void tearDown() throws Exception
{
loadQueuePeon.stop();
pathChildrenCache.close();
tearDownServerAndCurator();
}
@Test
public void testMoveSegment()
{
final DataSegment segment = EasyMock.createNiceMock(DataSegment.class);
EasyMock.expect(segment.getId()).andReturn(SegmentId.dummy("dummySegment"));
EasyMock.expect(segment.getDataSource()).andReturn("dummyDataSource");
EasyMock.replay(segment);
loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class);
EasyMock.expect(loadQueuePeon.getLoadQueueSize()).andReturn(new Long(1));
loadQueuePeon.markSegmentToDrop(segment);
EasyMock.expectLastCall().once();
Capture<LoadPeonCallback> loadCallbackCapture = Capture.newInstance();
Capture<LoadPeonCallback> dropCallbackCapture = Capture.newInstance();
loadQueuePeon.loadSegment(EasyMock.anyObject(DataSegment.class), EasyMock.capture(loadCallbackCapture));
EasyMock.expectLastCall().once();
loadQueuePeon.dropSegment(EasyMock.anyObject(DataSegment.class), EasyMock.capture(dropCallbackCapture));
EasyMock.expectLastCall().once();
loadQueuePeon.unmarkSegmentToDrop(segment);
EasyMock.expectLastCall().once();
EasyMock.expect(loadQueuePeon.getSegmentsToDrop()).andReturn(new HashSet<>()).once();
EasyMock.replay(loadQueuePeon);
ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(segment);
EasyMock.replay(druidDataSource);
EasyMock
.expect(segmentsMetadataManager.getImmutableDataSourceWithUsedSegments(EasyMock.anyString()))
.andReturn(druidDataSource);
EasyMock.replay(segmentsMetadataManager);
EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes();
EasyMock.replay(dataSourcesSnapshot);
scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(scheduledExecutorFactory);
EasyMock.replay(metadataRuleManager);
ImmutableDruidDataSource dataSource = EasyMock.createMock(ImmutableDruidDataSource.class);
EasyMock.expect(dataSource.getSegments()).andReturn(Collections.singletonList(segment)).anyTimes();
EasyMock.replay(dataSource);
EasyMock.expect(druidServer.toImmutableDruidServer()).andReturn(
new ImmutableDruidServer(
new DruidServerMetadata("from", null, null, 5L, ServerType.HISTORICAL, null, 0),
1L,
ImmutableMap.of("dummyDataSource", dataSource),
1
)
).atLeastOnce();
EasyMock.replay(druidServer);
DruidServer druidServer2 = EasyMock.createMock(DruidServer.class);
EasyMock.expect(druidServer2.toImmutableDruidServer()).andReturn(
new ImmutableDruidServer(
new DruidServerMetadata("to", null, null, 5L, ServerType.HISTORICAL, null, 0),
1L,
ImmutableMap.of("dummyDataSource", dataSource),
1
)
).atLeastOnce();
EasyMock.replay(druidServer2);
loadManagementPeons.put("from", loadQueuePeon);
loadManagementPeons.put("to", loadQueuePeon);
EasyMock.expect(serverInventoryView.isSegmentLoadedByServer("to", segment)).andReturn(true).once();
EasyMock.replay(serverInventoryView);
mockCoordinatorRuntimeParams();
coordinator.moveSegment(
coordinatorRuntimeParams,
druidServer.toImmutableDruidServer(),
druidServer2.toImmutableDruidServer(),
segment,
null
);
LoadPeonCallback loadCallback = loadCallbackCapture.getValue();
loadCallback.execute();
LoadPeonCallback dropCallback = dropCallbackCapture.getValue();
dropCallback.execute();
EasyMock.verify(druidServer, druidServer2, loadQueuePeon, serverInventoryView, metadataRuleManager);
EasyMock.verify(coordinatorRuntimeParams);
}
private void mockCoordinatorRuntimeParams()
{
EasyMock.expect(coordinatorRuntimeParams.getDataSourcesSnapshot()).andReturn(this.dataSourcesSnapshot).anyTimes();
EasyMock.replay(coordinatorRuntimeParams);
}
@Test(timeout = 60_000L)
public void testCoordinatorRun() throws Exception
{
String dataSource = "dataSource1";
String tier = "hot";
// Setup MetadataRuleManager
Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(tier, 2));
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(foreverLoadRule)).atLeastOnce();
EasyMock.expect(metadataRuleManager.getAllRules())
.andReturn(ImmutableMap.of(dataSource, ImmutableList.of(foreverLoadRule))).atLeastOnce();
metadataRuleManager.stop();
EasyMock.expectLastCall().once();
EasyMock.replay(metadataRuleManager);
// Setup SegmentsMetadataManager
DruidDataSource[] dataSources = {
new DruidDataSource(dataSource, Collections.emptyMap())
};
final DataSegment dataSegment = new DataSegment(
dataSource,
Intervals.of("2010-01-01/P1D"),
"v1",
null,
null,
null,
null,
0x9,
0
);
dataSources[0].addSegment(dataSegment);
setupSegmentsMetadataMock(dataSources[0]);
ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(immutableDruidDataSource.getSegments())
.andReturn(ImmutableSet.of(dataSegment)).atLeastOnce();
EasyMock.replay(immutableDruidDataSource);
// Setup ServerInventoryView
druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, tier, 0);
loadManagementPeons.put("server1", loadQueuePeon);
EasyMock.expect(serverInventoryView.getInventory()).andReturn(
ImmutableList.of(druidServer)
).atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(serverInventoryView);
coordinator.start();
// Wait for this coordinator to become leader
leaderAnnouncerLatch.await();
// This coordinator should be leader by now
Assert.assertTrue(coordinator.isLeader());
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());
pathChildrenCache.start();
final CountDownLatch assignSegmentLatch = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
1,
pathChildrenCache,
ImmutableMap.of("2010-01-01T00:00:00.000Z_2010-01-02T00:00:00.000Z", dataSegment),
druidServer
);
assignSegmentLatch.await();
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
Object2IntMap<String> numsUnavailableUsedSegmentsPerDataSource =
coordinator.computeNumsUnavailableUsedSegmentsPerDataSource();
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.size());
Assert.assertEquals(0, numsUnavailableUsedSegmentsPerDataSource.getInt(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
Assert.assertEquals(1, underReplicationCountsPerDataSourcePerTier.size());
Object2LongMap<String> underRepliicationCountsPerDataSource = underReplicationCountsPerDataSourcePerTier.get(tier);
Assert.assertNotNull(underRepliicationCountsPerDataSource);
Assert.assertEquals(1, underRepliicationCountsPerDataSource.size());
//noinspection deprecation
Assert.assertNotNull(underRepliicationCountsPerDataSource.get(dataSource));
// Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
// The load rules asks for 2 replicas, therefore 1 replica should still be pending
Assert.assertEquals(1L, underRepliicationCountsPerDataSource.getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTierUsingClusterView =
coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView();
Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
Assert.assertEquals(1, underReplicationCountsPerDataSourcePerTier.size());
Object2LongMap<String> underRepliicationCountsPerDataSourceUsingClusterView =
underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tier);
Assert.assertNotNull(underRepliicationCountsPerDataSourceUsingClusterView);
Assert.assertEquals(1, underRepliicationCountsPerDataSourceUsingClusterView.size());
//noinspection deprecation
Assert.assertNotNull(underRepliicationCountsPerDataSourceUsingClusterView.get(dataSource));
// Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
// The load rules asks for 2 replicas, but only 1 historical server in cluster. Since computing using cluster view
// the segments are replicated as many times as they can be given state of cluster, therefore should not be
// under-replicated.
Assert.assertEquals(0L, underRepliicationCountsPerDataSourceUsingClusterView.getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
Assert.assertFalse(coordinator.isLeader());
Assert.assertNull(coordinator.getCurrentLeader());
EasyMock.verify(serverInventoryView);
EasyMock.verify(metadataRuleManager);
}
@Test(timeout = 60_000L)
public void testCoordinatorTieredRun() throws Exception
{
final String dataSource = "dataSource", hotTierName = "hot", coldTierName = "cold";
final Rule hotTier = new IntervalLoadRule(Intervals.of("2018-01-01/P1M"), ImmutableMap.of(hotTierName, 1));
final Rule coldTier = new ForeverLoadRule(ImmutableMap.of(coldTierName, 1));
final String loadPathCold = "/druid/loadqueue/cold:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);
final Map<String, DataSegment> dataSegments = ImmutableMap.of(
"2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
"2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
"2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);
final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
curator,
loadPathCold,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
druidCoordinatorConfig
);
final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
curator,
loadPathCold,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
);
loadManagementPeons.putAll(ImmutableMap.of("hot", loadQueuePeon, "cold", loadQueuePeonCold));
loadQueuePeonCold.start();
pathChildrenCache.start();
pathChildrenCacheCold.start();
DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
dataSegments.values().forEach(druidDataSources[0]::addSegment);
setupSegmentsMetadataMock(druidDataSources[0]);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(hotTier, coldTier)).atLeastOnce();
EasyMock.expect(metadataRuleManager.getAllRules())
.andReturn(ImmutableMap.of(dataSource, ImmutableList.of(hotTier, coldTier))).atLeastOnce();
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(metadataRuleManager, serverInventoryView);
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(2, pathChildrenCache, dataSegments, hotServer);
final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(1, pathChildrenCacheCold, dataSegments, coldServer);
assignSegmentLatchHot.await();
assignSegmentLatchCold.await();
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTierUsingClusterView =
coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView();
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTierUsingClusterView.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
EasyMock.verify(serverInventoryView);
EasyMock.verify(segmentsMetadataManager);
EasyMock.verify(metadataRuleManager);
}
@Test(timeout = 60_000L)
public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWithBroadcastRule() throws Exception
{
final String dataSource = "dataSource";
final String hotTierName = "hot";
final String coldTierName = "cold";
final String tierName1 = "tier1";
final String tierName2 = "tier2";
final Rule broadcastDistributionRule = new ForeverBroadcastDistributionRule();
final String loadPathCold = "/druid/loadqueue/cold:1234";
final String loadPathBroker1 = "/druid/loadqueue/broker1:1234";
final String loadPathBroker2 = "/druid/loadqueue/broker2:1234";
final String loadPathPeon = "/druid/loadqueue/peon:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);
final DruidServer brokerServer1 = new DruidServer("broker1", "broker1", null, 5L, ServerType.BROKER, tierName1, 0);
final DruidServer brokerServer2 = new DruidServer("broker2", "broker2", null, 5L, ServerType.BROKER, tierName2, 0);
final DruidServer peonServer = new DruidServer("peon", "peon", null, 5L, ServerType.INDEXER_EXECUTOR, tierName2, 0);
final Map<String, DataSegment> dataSegments = ImmutableMap.of(
"2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
"2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
"2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);
final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
curator,
loadPathCold,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
druidCoordinatorConfig
);
final LoadQueuePeon loadQueuePeonBroker1 = new CuratorLoadQueuePeon(
curator,
loadPathBroker1,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker1_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_broker1-%d"),
druidCoordinatorConfig
);
final LoadQueuePeon loadQueuePeonBroker2 = new CuratorLoadQueuePeon(
curator,
loadPathBroker2,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker2_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_broker2-%d"),
druidCoordinatorConfig
);
final LoadQueuePeon loadQueuePeonPoenServer = new CuratorLoadQueuePeon(
curator,
loadPathPeon,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_peon_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_peon-%d"),
druidCoordinatorConfig
);
final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
curator,
loadPathCold,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
);
final PathChildrenCache pathChildrenCacheBroker1 = new PathChildrenCache(
curator,
loadPathBroker1,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_broker1-%d")
);
final PathChildrenCache pathChildrenCacheBroker2 = new PathChildrenCache(
curator,
loadPathBroker2,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_broker2-%d")
);
final PathChildrenCache pathChildrenCachePeon = new PathChildrenCache(
curator,
loadPathPeon,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_peon-%d")
);
loadManagementPeons.putAll(ImmutableMap.of("hot", loadQueuePeon,
"cold", loadQueuePeonCold,
"broker1", loadQueuePeonBroker1,
"broker2", loadQueuePeonBroker2,
"peon", loadQueuePeonPoenServer));
loadQueuePeonCold.start();
loadQueuePeonBroker1.start();
loadQueuePeonBroker2.start();
loadQueuePeonPoenServer.start();
pathChildrenCache.start();
pathChildrenCacheCold.start();
pathChildrenCacheBroker1.start();
pathChildrenCacheBroker2.start();
pathChildrenCachePeon.start();
DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
dataSegments.values().forEach(druidDataSources[0]::addSegment);
setupSegmentsMetadataMock(druidDataSources[0]);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(broadcastDistributionRule)).atLeastOnce();
EasyMock.expect(metadataRuleManager.getAllRules())
.andReturn(ImmutableMap.of(dataSource, ImmutableList.of(broadcastDistributionRule))).atLeastOnce();
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer, brokerServer1, brokerServer2, peonServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(metadataRuleManager, serverInventoryView);
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCache, dataSegments, hotServer);
final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheCold, dataSegments, coldServer);
final CountDownLatch assignSegmentLatchBroker1 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker1, dataSegments, brokerServer1);
final CountDownLatch assignSegmentLatchBroker2 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker2, dataSegments, brokerServer2);
final CountDownLatch assignSegmentLatchPeon = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCachePeon, dataSegments, peonServer);
assignSegmentLatchHot.await();
assignSegmentLatchCold.await();
assignSegmentLatchBroker1.await();
assignSegmentLatchBroker2.await();
assignSegmentLatchPeon.await();
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
Assert.assertEquals(4, underReplicationCountsPerDataSourcePerTier.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName1).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName2).getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTierUsingClusterView =
coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView();
Assert.assertEquals(4, underReplicationCountsPerDataSourcePerTierUsingClusterView.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tierName1).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tierName2).getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
EasyMock.verify(serverInventoryView);
EasyMock.verify(segmentsMetadataManager);
EasyMock.verify(metadataRuleManager);
}
@Test
public void testBalancerThreadNumber()
{
CoordinatorDynamicConfig dynamicConfig = EasyMock.createNiceMock(CoordinatorDynamicConfig.class);
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2);
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once();
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference(dynamicConfig)).anyTimes();
ScheduledExecutorFactory scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
DruidCoordinator c = new DruidCoordinator(
null,
null,
configManager,
null,
null,
null,
() -> null,
null,
scheduledExecutorFactory,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
ZkEnablementConfig.ENABLED
);
DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0, "TEST");
// before initialization
Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
Assert.assertNull(c.getBalancerExec());
// first initialization
duty.initBalancerExecutor();
System.out.println("c.getCachedBalancerThreadNumber(): " + c.getCachedBalancerThreadNumber());
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService firstExec = c.getBalancerExec();
Assert.assertNotNull(firstExec);
// second initialization, expect no changes as cachedBalancerThreadNumber is not changed
duty.initBalancerExecutor();
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService secondExec = c.getBalancerExec();
Assert.assertNotNull(secondExec);
Assert.assertTrue(firstExec == secondExec);
// third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10
duty.initBalancerExecutor();
Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
ListeningExecutorService thirdExec = c.getBalancerExec();
Assert.assertNotNull(thirdExec);
Assert.assertFalse(secondExec == thirdExec);
Assert.assertFalse(firstExec == thirdExec);
}
private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,
DruidServer server)
{
final CountDownLatch countDownLatch = new CountDownLatch(latchCount);
pathChildrenCache.getListenable().addListener(
(CuratorFramework client, PathChildrenCacheEvent event) -> {
if (CuratorUtils.isChildAdded(event)) {
DataSegment segment = findSegmentRelatedToCuratorEvent(segments, event);
if (segment != null && server.getSegment(segment.getId()) == null) {
if (countDownLatch.getCount() > 0) {
server.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
countDownLatch.countDown();
} else {
Assert.fail("The segment path " + event.getData().getPath() + " is not expected");
}
}
}
}
);
return countDownLatch;
}
private void setupSegmentsMetadataMock(DruidDataSource dataSource)
{
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
EasyMock
.expect(segmentsMetadataManager.iterateAllUsedSegments())
.andReturn(dataSource.getSegments())
.anyTimes();
EasyMock
.expect(segmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
.andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
.anyTimes();
DataSourcesSnapshot dataSourcesSnapshot =
new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource()));
EasyMock
.expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments())
.andReturn(dataSourcesSnapshot)
.anyTimes();
EasyMock
.expect(segmentsMetadataManager.retrieveAllDataSourceNames())
.andReturn(Collections.singleton(dataSource.getName()))
.anyTimes();
EasyMock.replay(segmentsMetadataManager);
EasyMock
.expect(this.dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
.andReturn(dataSource.getSegments())
.anyTimes();
EasyMock
.expect(this.dataSourcesSnapshot.getDataSourcesWithAllUsedSegments())
.andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
.anyTimes();
EasyMock.replay(this.dataSourcesSnapshot);
}
@Nullable
private static DataSegment findSegmentRelatedToCuratorEvent(
Map<String, DataSegment> dataSegments,
PathChildrenCacheEvent event
)
{
return dataSegments
.entrySet()
.stream()
.filter(x -> event.getData().getPath().contains(x.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);
}
private static class TestDruidLeaderSelector implements DruidLeaderSelector
{
private volatile Listener listener;
private volatile String leader;
@Override
public String getCurrentLeader()
{
return leader;
}
@Override
public boolean isLeader()
{
return leader != null;
}
@Override
public int localTerm()
{
return 0;
}
@Override
public void registerListener(Listener listener)
{
this.listener = listener;
leader = "what:1234";
listener.becomeLeader();
}
@Override
public void unregisterListener()
{
leader = null;
listener.stopBeingLeader();
}
}
private static class LatchableServiceEmitter extends ServiceEmitter
{
private CountDownLatch latch;
private LatchableServiceEmitter()
{
super("", "", null);
}
@Override
public void emit(Event event)
{
if (latch != null && "segment/count".equals(event.toMap().get("metric"))) {
latch.countDown();
}
}
}
}