blob: a5ed1616a6d0d99d5cd85b044a2c1b00057411a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.discovery.LatchableServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs;
import org.apache.druid.server.coordinator.config.CoordinatorPeriodConfig;
import org.apache.druid.server.coordinator.config.CoordinatorRunConfig;
import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class DruidCoordinatorTest extends CuratorTestBase
{
private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
private static final Duration LOAD_TIMEOUT = Duration.standardMinutes(15);
private static final long COORDINATOR_START_DELAY = 1;
private static final long COORDINATOR_PERIOD = 100;
private DruidCoordinator coordinator;
private SegmentsMetadataManager segmentsMetadataManager;
private DataSourcesSnapshot dataSourcesSnapshot;
private BatchServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory;
private DruidServer druidServer;
private LoadQueuePeon loadQueuePeon;
private LoadQueueTaskMaster loadQueueTaskMaster;
private MetadataRuleManager metadataRuleManager;
private CountDownLatch leaderAnnouncerLatch;
private CountDownLatch leaderUnannouncerLatch;
private PathChildrenCache pathChildrenCache;
private DruidCoordinatorConfig druidCoordinatorConfig;
private ObjectMapper objectMapper;
private DruidNode druidNode;
private NewestSegmentFirstPolicy newestSegmentFirstPolicy;
private final LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
@Before
public void setUp() throws Exception
{
druidServer = new DruidServer("from", "from", null, 5L, ServerType.HISTORICAL, "tier1", 0);
serverInventoryView = EasyMock.createMock(BatchServerInventoryView.class);
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
loadQueueTaskMaster = EasyMock.createMock(LoadQueueTaskMaster.class);
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes();
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes();
EasyMock.replay(configManager);
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
curator.create().creatingParentsIfNeeded().forPath(LOADPATH);
objectMapper = new DefaultObjectMapper();
newestSegmentFirstPolicy = new NewestSegmentFirstPolicy(objectMapper);
druidCoordinatorConfig = new DruidCoordinatorConfig(
new CoordinatorRunConfig(new Duration(COORDINATOR_START_DELAY), new Duration(COORDINATOR_PERIOD)),
new CoordinatorPeriodConfig(null, null),
CoordinatorKillConfigs.DEFAULT,
new CostBalancerStrategyFactory(),
null
);
pathChildrenCache = new PathChildrenCache(
curator,
LOADPATH,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache-%d")
);
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOADPATH,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon-%d"),
LOAD_TIMEOUT
);
loadQueuePeon.start();
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
scheduledExecutorFactory = ScheduledExecutors::fixed;
leaderAnnouncerLatch = new CountDownLatch(1);
leaderUnannouncerLatch = new CountDownLatch(1);
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
createMetadataManager(configManager),
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
loadQueueTaskMaster,
new SegmentLoadQueueManager(serverInventoryView, loadQueueTaskMaster),
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
druidNode,
new CoordinatorCustomDutyGroups(ImmutableSet.of()),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
null,
CentralizedDatasourceSchemaConfig.create()
);
}
private MetadataManager createMetadataManager(JacksonConfigManager configManager)
{
return new MetadataManager(
null,
new CoordinatorConfigManager(configManager, null, null),
segmentsMetadataManager,
null,
metadataRuleManager,
null,
null
);
}
@After
public void tearDown() throws Exception
{
loadQueuePeon.stop();
pathChildrenCache.close();
tearDownServerAndCurator();
}
@Test(timeout = 60_000L)
public void testCoordinatorRun() throws Exception
{
String dataSource = "dataSource1";
String tier = "hot";
// Setup MetadataRuleManager
Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(tier, 2), null);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(foreverLoadRule)).atLeastOnce();
metadataRuleManager.stop();
EasyMock.expectLastCall().once();
EasyMock.replay(metadataRuleManager);
// Setup SegmentsMetadataManager
DruidDataSource[] dataSources = {
new DruidDataSource(dataSource, Collections.emptyMap())
};
final DataSegment dataSegment = new DataSegment(
dataSource,
Intervals.of("2010-01-01/P1D"),
"v1",
null,
null,
null,
null,
0x9,
0
);
dataSources[0].addSegment(dataSegment);
setupSegmentsMetadataMock(dataSources[0]);
ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(immutableDruidDataSource.getSegments())
.andReturn(ImmutableSet.of(dataSegment)).atLeastOnce();
EasyMock.replay(immutableDruidDataSource);
// Setup ServerInventoryView
druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, tier, 0);
setupPeons(Collections.singletonMap("server1", loadQueuePeon));
EasyMock.expect(serverInventoryView.getInventory()).andReturn(
ImmutableList.of(druidServer)
).atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(serverInventoryView, loadQueueTaskMaster);
coordinator.start();
Assert.assertNull(coordinator.getReplicationFactor(dataSegment.getId()));
// Wait for this coordinator to become leader
leaderAnnouncerLatch.await();
// This coordinator should be leader by now
Assert.assertTrue(coordinator.isLeader());
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());
pathChildrenCache.start();
final CountDownLatch assignSegmentLatch = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
1,
pathChildrenCache,
ImmutableMap.of("2010-01-01T00:00:00.000Z_2010-01-02T00:00:00.000Z", dataSegment),
druidServer
);
assignSegmentLatch.await();
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus());
Object2IntMap<String> numsUnavailableUsedSegmentsPerDataSource =
coordinator.getDatasourceToUnavailableSegmentCount();
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.size());
Assert.assertEquals(0, numsUnavailableUsedSegmentsPerDataSource.getInt(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.getTierToDatasourceToUnderReplicatedCount(false);
Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
Assert.assertEquals(1, underReplicationCountsPerDataSourcePerTier.size());
Object2LongMap<String> underRepliicationCountsPerDataSource = underReplicationCountsPerDataSourcePerTier.get(tier);
Assert.assertNotNull(underRepliicationCountsPerDataSource);
Assert.assertEquals(1, underRepliicationCountsPerDataSource.size());
//noinspection deprecation
Assert.assertNotNull(underRepliicationCountsPerDataSource.get(dataSource));
// Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
// The load rules asks for 2 replicas, therefore 1 replica should still be pending
Assert.assertEquals(1L, underRepliicationCountsPerDataSource.getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTierUsingClusterView =
coordinator.getTierToDatasourceToUnderReplicatedCount(true);
Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
Assert.assertEquals(1, underReplicationCountsPerDataSourcePerTier.size());
Object2LongMap<String> underRepliicationCountsPerDataSourceUsingClusterView =
underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tier);
Assert.assertNotNull(underRepliicationCountsPerDataSourceUsingClusterView);
Assert.assertEquals(1, underRepliicationCountsPerDataSourceUsingClusterView.size());
//noinspection deprecation
Assert.assertNotNull(underRepliicationCountsPerDataSourceUsingClusterView.get(dataSource));
// Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
// The load rules asks for 2 replicas, but only 1 historical server in cluster. Since computing using cluster view
// the segments are replicated as many times as they can be given state of cluster, therefore should not be
// under-replicated.
Assert.assertEquals(0L, underRepliicationCountsPerDataSourceUsingClusterView.getLong(dataSource));
Assert.assertEquals(Integer.valueOf(2), coordinator.getReplicationFactor(dataSegment.getId()));
coordinator.stop();
leaderUnannouncerLatch.await();
Assert.assertFalse(coordinator.isLeader());
Assert.assertNull(coordinator.getCurrentLeader());
EasyMock.verify(serverInventoryView);
EasyMock.verify(metadataRuleManager);
}
@Test(timeout = 60_000L)
public void testCoordinatorTieredRun() throws Exception
{
final String dataSource = "dataSource", hotTierName = "hot", coldTierName = "cold";
final Rule hotTier = new IntervalLoadRule(Intervals.of("2018-01-01/P1M"), ImmutableMap.of(hotTierName, 1), null);
final Rule coldTier = new ForeverLoadRule(ImmutableMap.of(coldTierName, 1), null);
final String loadPathCold = "/druid/loadqueue/cold:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);
final Map<String, DataSegment> dataSegments = ImmutableMap.of(
"2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
"2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
"2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);
final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
curator,
loadPathCold,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
LOAD_TIMEOUT
);
final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
curator,
loadPathCold,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
);
setupPeons(ImmutableMap.of("hot", loadQueuePeon, "cold", loadQueuePeonCold));
loadQueuePeonCold.start();
pathChildrenCache.start();
pathChildrenCacheCold.start();
DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
dataSegments.values().forEach(druidDataSources[0]::addSegment);
setupSegmentsMetadataMock(druidDataSources[0]);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(hotTier, coldTier)).atLeastOnce();
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(metadataRuleManager, serverInventoryView, loadQueueTaskMaster);
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(2, pathChildrenCache, dataSegments, hotServer);
final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(1, pathChildrenCacheCold, dataSegments, coldServer);
assignSegmentLatchHot.await();
assignSegmentLatchCold.await();
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus());
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.getTierToDatasourceToUnderReplicatedCount(false);
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTierUsingClusterView =
coordinator.getTierToDatasourceToUnderReplicatedCount(true);
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTierUsingClusterView.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource));
dataSegments.values().forEach(dataSegment -> Assert.assertEquals(Integer.valueOf(1), coordinator.getReplicationFactor(dataSegment.getId())));
coordinator.stop();
leaderUnannouncerLatch.await();
EasyMock.verify(serverInventoryView);
EasyMock.verify(segmentsMetadataManager);
EasyMock.verify(metadataRuleManager);
}
@Test(timeout = 60_000L)
public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWithBroadcastRule() throws Exception
{
final String dataSource = "dataSource";
final String hotTierName = "hot";
final String coldTierName = "cold";
final String tierName1 = "tier1";
final String tierName2 = "tier2";
final String loadPathCold = "/druid/loadqueue/cold:1234";
final String loadPathBroker1 = "/druid/loadqueue/broker1:1234";
final String loadPathBroker2 = "/druid/loadqueue/broker2:1234";
final String loadPathPeon = "/druid/loadqueue/peon:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);
final DruidServer brokerServer1 = new DruidServer("broker1", "broker1", null, 5L, ServerType.BROKER, tierName1, 0);
final DruidServer brokerServer2 = new DruidServer("broker2", "broker2", null, 5L, ServerType.BROKER, tierName2, 0);
final DruidServer peonServer = new DruidServer("peon", "peon", null, 5L, ServerType.INDEXER_EXECUTOR, tierName2, 0);
final Map<String, DataSegment> dataSegments = ImmutableMap.of(
"2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
"2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
"2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);
final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
curator,
loadPathCold,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
LOAD_TIMEOUT
);
final LoadQueuePeon loadQueuePeonBroker1 = new CuratorLoadQueuePeon(
curator,
loadPathBroker1,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker1_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_broker1-%d"),
LOAD_TIMEOUT
);
final LoadQueuePeon loadQueuePeonBroker2 = new CuratorLoadQueuePeon(
curator,
loadPathBroker2,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker2_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_broker2-%d"),
LOAD_TIMEOUT
);
final LoadQueuePeon loadQueuePeonPoenServer = new CuratorLoadQueuePeon(
curator,
loadPathPeon,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_peon_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_peon-%d"),
LOAD_TIMEOUT
);
final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
curator,
loadPathCold,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
);
final PathChildrenCache pathChildrenCacheBroker1 = new PathChildrenCache(
curator,
loadPathBroker1,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_broker1-%d")
);
final PathChildrenCache pathChildrenCacheBroker2 = new PathChildrenCache(
curator,
loadPathBroker2,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_broker2-%d")
);
final PathChildrenCache pathChildrenCachePeon = new PathChildrenCache(
curator,
loadPathPeon,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_peon-%d")
);
setupPeons(ImmutableMap.of(
"hot", loadQueuePeon,
"cold", loadQueuePeonCold,
"broker1", loadQueuePeonBroker1,
"broker2", loadQueuePeonBroker2,
"peon", loadQueuePeonPoenServer
));
loadQueuePeonCold.start();
loadQueuePeonBroker1.start();
loadQueuePeonBroker2.start();
loadQueuePeonPoenServer.start();
pathChildrenCache.start();
pathChildrenCacheCold.start();
pathChildrenCacheBroker1.start();
pathChildrenCacheBroker2.start();
pathChildrenCachePeon.start();
DruidDataSource druidDataSource = new DruidDataSource(dataSource, Collections.emptyMap());
dataSegments.values().forEach(druidDataSource::addSegment);
setupSegmentsMetadataMock(druidDataSource);
final Rule broadcastDistributionRule = new ForeverBroadcastDistributionRule();
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(broadcastDistributionRule)).atLeastOnce();
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer, brokerServer1, brokerServer2, peonServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(metadataRuleManager, serverInventoryView, loadQueueTaskMaster);
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCache, dataSegments, hotServer);
final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheCold, dataSegments, coldServer);
final CountDownLatch assignSegmentLatchBroker1 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker1, dataSegments, brokerServer1);
final CountDownLatch assignSegmentLatchBroker2 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker2, dataSegments, brokerServer2);
final CountDownLatch assignSegmentLatchPeon = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCachePeon, dataSegments, peonServer);
assignSegmentLatchHot.await();
assignSegmentLatchCold.await();
assignSegmentLatchBroker1.await();
assignSegmentLatchBroker2.await();
assignSegmentLatchPeon.await();
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus());
// Under-replicated counts are updated only after the next coordinator run
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.getTierToDatasourceToUnderReplicatedCount(false);
Assert.assertEquals(4, underReplicationCountsPerDataSourcePerTier.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName1).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName2).getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTierUsingClusterView =
coordinator.getTierToDatasourceToUnderReplicatedCount(true);
Assert.assertEquals(4, underReplicationCountsPerDataSourcePerTierUsingClusterView.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tierName1).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tierName2).getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
EasyMock.verify(serverInventoryView);
EasyMock.verify(segmentsMetadataManager);
EasyMock.verify(metadataRuleManager);
}
@Test
public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty()
{
CoordinatorCustomDutyGroups emptyCustomDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of());
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
createMetadataManager(null),
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
loadQueueTaskMaster,
null,
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
druidNode,
emptyCustomDutyGroups,
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
null,
CentralizedDatasourceSchemaConfig.create()
);
// Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
// CompactSegments should not exist in Custom Duty Group
List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
// CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
Assert.assertNotNull(duty);
}
@Test
public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsCompactSegments()
{
CoordinatorCustomDutyGroup group = new CoordinatorCustomDutyGroup(
"group1",
Duration.standardSeconds(1),
ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), null))
);
CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(group));
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
createMetadataManager(null),
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
loadQueueTaskMaster,
null,
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
druidNode,
customDutyGroups,
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
null,
CentralizedDatasourceSchemaConfig.create()
);
// Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
// CompactSegments should not exist in Custom Duty Group
List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
// CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
Assert.assertNotNull(duty);
}
@Test
public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments()
{
CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup(
"group1",
Duration.standardSeconds(1),
ImmutableList.of(new CompactSegments(null, null))
);
CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup));
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
createMetadataManager(null),
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
loadQueueTaskMaster,
null,
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
druidNode,
customDutyGroups,
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
null,
CentralizedDatasourceSchemaConfig.create()
);
// Since CompactSegments is enabled in Custom Duty Group, then CompactSegments must not be created in IndexingServiceDuties
List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
Assert.assertTrue(indexingDuties.stream().noneMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
// CompactSegments should exist in Custom Duty Group
List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
Assert.assertFalse(compactSegmentsDutyFromCustomGroups.isEmpty());
Assert.assertEquals(1, compactSegmentsDutyFromCustomGroups.size());
Assert.assertNotNull(compactSegmentsDutyFromCustomGroups.get(0));
// CompactSegments returned by this method should be from the Custom Duty Group
CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
Assert.assertNotNull(duty);
}
@Test(timeout = 3000)
public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
{
// Some nessesary setup to start the Coordinator
setupPeons(Collections.emptyMap());
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes();
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes();
EasyMock.replay(configManager);
DruidDataSource dataSource = new DruidDataSource("dataSource1", Collections.emptyMap());
DataSegment dataSegment = new DataSegment(
"dataSource1",
Intervals.of("2010-01-01/P1D"),
"v1",
null,
null,
null,
null,
0x9,
0
);
dataSource.addSegment(dataSegment);
DataSourcesSnapshot dataSourcesSnapshot =
new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource()));
EasyMock
.expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments())
.andReturn(dataSourcesSnapshot)
.anyTimes();
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
EasyMock.expect(segmentsMetadataManager.iterateAllUsedSegments())
.andReturn(Collections.singletonList(dataSegment)).anyTimes();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(serverInventoryView.getInventory()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.replay(serverInventoryView, loadQueueTaskMaster, segmentsMetadataManager);
// Create CoordinatorCustomDutyGroups
// We will have two groups and each group has one duty
CountDownLatch latch1 = new CountDownLatch(1);
CoordinatorCustomDuty duty1 = params -> {
latch1.countDown();
return params;
};
CoordinatorCustomDutyGroup group1 = new CoordinatorCustomDutyGroup(
"group1",
Duration.standardSeconds(1),
ImmutableList.of(duty1)
);
CountDownLatch latch2 = new CountDownLatch(1);
CoordinatorCustomDuty duty2 = params -> {
latch2.countDown();
return params;
};
CoordinatorCustomDutyGroup group2 = new CoordinatorCustomDutyGroup(
"group2",
Duration.standardSeconds(1),
ImmutableList.of(duty2)
);
CoordinatorCustomDutyGroups groups = new CoordinatorCustomDutyGroups(ImmutableSet.of(group1, group2));
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
createMetadataManager(configManager),
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
loadQueueTaskMaster,
new SegmentLoadQueueManager(serverInventoryView, loadQueueTaskMaster),
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
druidNode,
groups,
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
null,
CentralizedDatasourceSchemaConfig.create()
);
coordinator.start();
// Wait until group 1 duty ran for latch1 to countdown
latch1.await();
// Wait until group 2 duty ran for latch2 to countdown
latch2.await();
}
@Test(timeout = 60_000L)
public void testCoordinatorRun_queryFromDeepStorage() throws Exception
{
String dataSource = "dataSource1";
String coldTier = "coldTier";
String hotTier = "hotTier";
// Setup MetadataRuleManager
Rule intervalLoadRule = new IntervalLoadRule(Intervals.of("2010-02-01/P1M"), ImmutableMap.of(hotTier, 1), null);
Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(coldTier, 0), null);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(intervalLoadRule, foreverLoadRule)).atLeastOnce();
metadataRuleManager.stop();
EasyMock.expectLastCall().once();
EasyMock.replay(metadataRuleManager);
// Setup SegmentsMetadataManager
DruidDataSource[] dataSources = {
new DruidDataSource(dataSource, Collections.emptyMap())
};
final DataSegment dataSegment = new DataSegment(
dataSource,
Intervals.of("2010-01-01/P1D"),
"v1",
null,
null,
null,
null,
0x9,
0
);
final DataSegment dataSegmentHot = new DataSegment(
dataSource,
Intervals.of("2010-02-01/P1D"),
"v1",
null,
null,
null,
null,
0x9,
0
);
dataSources[0].addSegment(dataSegment).addSegment(dataSegmentHot);
setupSegmentsMetadataMock(dataSources[0]);
ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(immutableDruidDataSource.getSegments())
.andReturn(ImmutableSet.of(dataSegment, dataSegmentHot)).atLeastOnce();
EasyMock.replay(immutableDruidDataSource);
// Setup ServerInventoryView
druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, hotTier, 0);
DruidServer druidServer2 = new DruidServer("server2", "localhost", null, 5L, ServerType.HISTORICAL, coldTier, 0);
setupPeons(ImmutableMap.of("server1", loadQueuePeon, "server2", loadQueuePeon));
EasyMock.expect(serverInventoryView.getInventory()).andReturn(
ImmutableList.of(druidServer, druidServer2)
).atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(serverInventoryView, loadQueueTaskMaster);
coordinator.start();
// Wait for this coordinator to become leader
leaderAnnouncerLatch.await();
// This coordinator should be leader by now
Assert.assertTrue(coordinator.isLeader());
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());
pathChildrenCache.start();
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Object2IntMap<String> numsUnavailableUsedSegmentsPerDataSource =
coordinator.getDatasourceToUnavailableSegmentCount();
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.size());
// The cold tier segment should not be unavailable, the hot one should be unavailable
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.getInt(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.getTierToDatasourceToUnderReplicatedCount(false);
Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());
Object2LongMap<String> underRepliicationCountsPerDataSourceHotTier = underReplicationCountsPerDataSourcePerTier.get(hotTier);
Assert.assertNotNull(underRepliicationCountsPerDataSourceHotTier);
Assert.assertEquals(1, underRepliicationCountsPerDataSourceHotTier.getLong(dataSource));
Object2LongMap<String> underRepliicationCountsPerDataSourceColdTier = underReplicationCountsPerDataSourcePerTier.get(coldTier);
Assert.assertNotNull(underRepliicationCountsPerDataSourceColdTier);
Assert.assertEquals(0, underRepliicationCountsPerDataSourceColdTier.getLong(dataSource));
Object2IntMap<String> numsDeepStorageOnlySegmentsPerDataSource =
coordinator.getDatasourceToDeepStorageQueryOnlySegmentCount();
Assert.assertEquals(1, numsDeepStorageOnlySegmentsPerDataSource.getInt(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
Assert.assertFalse(coordinator.isLeader());
Assert.assertNull(coordinator.getCurrentLeader());
EasyMock.verify(serverInventoryView);
EasyMock.verify(metadataRuleManager);
}
private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,
DruidServer server
)
{
final CountDownLatch countDownLatch = new CountDownLatch(latchCount);
pathChildrenCache.getListenable().addListener(
(CuratorFramework client, PathChildrenCacheEvent event) -> {
if (CuratorUtils.isChildAdded(event)) {
DataSegment segment = findSegmentRelatedToCuratorEvent(segments, event);
if (segment != null && server.getSegment(segment.getId()) == null) {
if (countDownLatch.getCount() > 0) {
server.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
countDownLatch.countDown();
} else {
Assert.fail("The segment path " + event.getData().getPath() + " is not expected");
}
}
}
}
);
return countDownLatch;
}
private void setupSegmentsMetadataMock(DruidDataSource dataSource)
{
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
EasyMock
.expect(segmentsMetadataManager.iterateAllUsedSegments())
.andReturn(dataSource.getSegments())
.anyTimes();
EasyMock
.expect(segmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
.andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
.anyTimes();
DataSourcesSnapshot dataSourcesSnapshot =
new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource()));
EasyMock
.expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments())
.andReturn(dataSourcesSnapshot)
.anyTimes();
EasyMock
.expect(segmentsMetadataManager.retrieveAllDataSourceNames())
.andReturn(Collections.singleton(dataSource.getName()))
.anyTimes();
EasyMock.replay(segmentsMetadataManager);
EasyMock
.expect(this.dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
.andReturn(dataSource.getSegments())
.anyTimes();
EasyMock
.expect(this.dataSourcesSnapshot.getDataSourcesWithAllUsedSegments())
.andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
.anyTimes();
EasyMock.replay(this.dataSourcesSnapshot);
}
@Nullable
private static DataSegment findSegmentRelatedToCuratorEvent(
Map<String, DataSegment> dataSegments,
PathChildrenCacheEvent event
)
{
return dataSegments
.entrySet()
.stream()
.filter(x -> event.getData().getPath().contains(x.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);
}
private void setupPeons(Map<String, LoadQueuePeon> peonMap)
{
loadQueueTaskMaster.resetPeonsForNewServers(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
loadQueueTaskMaster.onLeaderStart();
EasyMock.expectLastCall().anyTimes();
loadQueueTaskMaster.onLeaderStop();
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(loadQueueTaskMaster.getAllPeons()).andReturn(peonMap).anyTimes();
EasyMock.expect(loadQueueTaskMaster.getPeonForServer(EasyMock.anyObject())).andAnswer(
() -> peonMap.get(((ImmutableDruidServer) EasyMock.getCurrentArgument(0)).getName())
).anyTimes();
}
private static class TestDruidLeaderSelector implements DruidLeaderSelector
{
private volatile Listener listener;
private volatile String leader;
@Override
public String getCurrentLeader()
{
return leader;
}
@Override
public boolean isLeader()
{
return leader != null;
}
@Override
public int localTerm()
{
return 0;
}
@Override
public void registerListener(Listener listener)
{
this.listener = listener;
leader = "what:1234";
listener.becomeLeader();
}
@Override
public void unregisterListener()
{
leader = null;
listener.stopBeingLeader();
}
}
private static class LatchableServiceEmitter extends ServiceEmitter
{
private CountDownLatch latch;
private LatchableServiceEmitter()
{
super("", "", null);
}
@Override
public void emit(Event event)
{
if (latch != null && "segment/count".equals(event.toMap().get("metric"))) {
latch.countDown();
}
}
}
}