Revert "remove ServerDiscoverySelector from DruidLeaderClient (#9481)" (#9702) (#9703)
* Revert "remove ServerDiscoverySelector from DruidLeaderClient (#9481)"
This reverts commit 072bbe210f162228e85391b464f114da448df24a.
* fix build
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index ba693e7..95b4fa6 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -22,6 +22,8 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.selector.DiscoverySelector;
+import org.apache.druid.client.selector.Server;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
@@ -70,6 +72,9 @@
private final String leaderRequestPath;
+ //Note: This is kept for back compatibility with pre 0.11.0 releases and should be removed in future.
+ private final DiscoverySelector<Server> serverDiscoverySelector;
+
private LifecycleLock lifecycleLock = new LifecycleLock();
private DruidNodeDiscovery druidNodeDiscovery;
private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
@@ -78,13 +83,15 @@
HttpClient httpClient,
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
NodeRole nodeRoleToWatch,
- String leaderRequestPath
+ String leaderRequestPath,
+ DiscoverySelector<Server> serverDiscoverySelector
)
{
this.httpClient = httpClient;
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.nodeRoleToWatch = nodeRoleToWatch;
this.leaderRequestPath = leaderRequestPath;
+ this.serverDiscoverySelector = serverDiscoverySelector;
}
@LifecycleStart
@@ -296,6 +303,16 @@
@Nullable
private String pickOneHost()
{
+ Server server = serverDiscoverySelector.pick();
+ if (server != null) {
+ return StringUtils.format(
+ "%s://%s:%s",
+ server.getScheme(),
+ server.getAddress(),
+ server.getPort()
+ );
+ }
+
Iterator<DiscoveryDruidNode> iter = druidNodeDiscovery.getAllNodes().iterator();
if (iter.hasNext()) {
DiscoveryDruidNode node = iter.next();
diff --git a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java
index d9f0837..b90a9b5 100644
--- a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java
+++ b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java
@@ -24,6 +24,8 @@
import com.google.inject.Provides;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.coordinator.CoordinatorSelectorConfig;
+import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
+import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
@@ -43,16 +45,29 @@
@Provides
@Coordinator
@ManageLifecycle
+ public ServerDiscoverySelector getServiceProvider(
+ CoordinatorSelectorConfig config,
+ ServerDiscoveryFactory serverDiscoveryFactory
+ )
+ {
+ return serverDiscoveryFactory.createSelector(config.getServiceName());
+ }
+
+ @Provides
+ @Coordinator
+ @ManageLifecycle
public DruidLeaderClient getLeaderHttpClient(
@EscalatedGlobal HttpClient httpClient,
- DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
+ @Coordinator ServerDiscoverySelector serverDiscoverySelector
)
{
return new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
NodeRole.COORDINATOR,
- "/druid/coordinator/v1/leader"
+ "/druid/coordinator/v1/leader",
+ serverDiscoverySelector
);
}
}
diff --git a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java
index 3e48078..3c4f63c 100644
--- a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java
+++ b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java
@@ -24,6 +24,8 @@
import com.google.inject.Provides;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.client.indexing.IndexingServiceSelectorConfig;
+import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
+import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
@@ -43,16 +45,29 @@
@Provides
@IndexingService
@ManageLifecycle
+ public ServerDiscoverySelector getServiceProvider(
+ IndexingServiceSelectorConfig config,
+ ServerDiscoveryFactory serverDiscoveryFactory
+ )
+ {
+ return serverDiscoveryFactory.createSelector(config.getServiceName());
+ }
+
+ @Provides
+ @IndexingService
+ @ManageLifecycle
public DruidLeaderClient getLeaderHttpClient(
@EscalatedGlobal HttpClient httpClient,
- DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
+ @IndexingService ServerDiscoverySelector serverDiscoverySelector
)
{
return new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
NodeRole.OVERLORD,
- "/druid/indexer/v1/leader"
+ "/druid/indexer/v1/leader",
+ serverDiscoverySelector
);
}
}
diff --git a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java
index 570720d..7c376ea 100644
--- a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java
@@ -29,6 +29,7 @@
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.google.inject.servlet.GuiceFilter;
+import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
@@ -122,7 +123,8 @@
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
- "/simple/leader"
+ "/simple/leader",
+ EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();
@@ -146,7 +148,8 @@
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
- "/simple/leader"
+ "/simple/leader",
+ EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();
@@ -172,7 +175,8 @@
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
- "/simple/leader"
+ "/simple/leader",
+ EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();
@@ -184,6 +188,9 @@
@Test
public void testServerFailureAndRedirect() throws Exception
{
+ ServerDiscoverySelector serverDiscoverySelector = EasyMock.createMock(ServerDiscoverySelector.class);
+ EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes();
+
DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
DiscoveryDruidNode dummyNode = new DiscoveryDruidNode(
new DruidNode("test", "dummyhost", false, 64231, null, true, false),
@@ -196,13 +203,14 @@
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery).anyTimes();
- EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
+ EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
- "/simple/leader"
+ "/simple/leader",
+ serverDiscoverySelector
);
druidLeaderClient.start();
@@ -228,7 +236,8 @@
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
- "/simple/leader"
+ "/simple/leader",
+ EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 683e776..43c3855 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -360,7 +360,7 @@
private TestDruidLeaderClient(ObjectMapper jsonMapper)
{
- super(null, new TestNodeDiscoveryProvider(), null, null);
+ super(null, new TestNodeDiscoveryProvider(), null, null, null);
this.jsonMapper = jsonMapper;
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 2792b2d..0874281 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -773,7 +773,10 @@
new FakeHttpClient(),
provider,
NodeRole.COORDINATOR,
- "/simple/leader"
+ "/simple/leader",
+ () -> {
+ throw new UnsupportedOperationException();
+ }
);
return new SystemSchema(