KAFKA-15466: Add KIP-919 support for some admin APIs (#14399)
Add support for --bootstrap-controller in the following command-line tools:
- kafka-cluster.sh
- kafka-configs.sh
- kafka-features.sh
- kafka-metadata-quorum.sh
To implement this, the following AdminClient APIs now support the new bootstrap.controllers
configuration:
- Admin.alterConfigs
- Admin.describeCluster
- Admin.describeConfigs
- Admin.describeFeatures
- Admin.describeMetadataQuorum
- Admin.incrementalAlterConfigs
- Admin.updateFeatures
Command-line tool changes:
- Add CommandLineUtils.initializeBootstrapProperties to handle parsing --bootstrap-controller
in addition to --bootstrap-server.
- Add --bootstrap-controller to ConfigCommand.scala, ClusterTool.java, FeatureCommand.java, and
MetadataQuorumCommand.java.
KafkaAdminClient changes:
- Add the AdminBootstrapAddresses class to handle extracting bootstrap.servers or
bootstrap.controllers from the config map for KafkaAdminClient.
- In AdminMetadataManager, store the new usingBootstrapControllers boolean. Generalize
authException to encompass the concept of fatal exceptions in general. (For example, the
fatal exception where we talked to the wrong node type.) Treat
MismatchedEndpointTypeException and UnsupportedEndpointTypeException as fatal exceptions.
- Extend NodeProvider to include information about whether bootstrap.controllers is supported.
- Modify the APIs described above to support bootstrap.controllers.
Server-side changes:
- Support DescribeConfigsRequest on kcontrollers.
- Add KRaftMetadataCache to the kcontroller to simplify implemeting describeConfigs (and
probably more APIs in the future). It's mainly a wrapper around MetadataImage, so there is
essentially no extra resource consumption.
- Split RuntimeLoggerManager out of ConfigAdminManager to handle the incrementalAlterConfigs
support for BROKER_LOGGER. This is now supported on kcontrollers as well as brokers.
- Fix bug in AuthHelper.computeDescribeClusterResponse that resulted in us always sending back
BROKER as the endpoint type, even on the kcontroller.
Miscellaneous:
- Fix a few places in exceptions and log messages where we wrote "broker" instead of "node".
For example, an exception in NodeApiVersions.java, and a log message in NetworkClient.java.
- Fix the slf4j log prefix used by KafkaRequestHandler logging so that request handlers on a
controller don't look like they're on a broker.
- Make the FinalizedVersionRange constructor public for the sake of a junit test.
- Add unit and integration tests for the above.
Reviewers: David Arthur <mumrah@gmail.com>, Doguscan Namal <namal.doguscan@gmail.com>
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 6136ff5..849c45e 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -95,6 +95,8 @@
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.server.authorizer"/>
<allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.test" />
+ <allow pkg="kafka.testkit"/>
<allow pkg="kafka.test.annotation"/>
<allow pkg="kafka.test.junit"/>
<allow pkg="kafka.network"/>
@@ -108,7 +110,6 @@
</subpackage>
<subpackage name="junit">
<allow pkg="kafka.test"/>
- <allow pkg="kafka.testkit"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.metadata" />
</subpackage>
diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml
index a8d032c..7987370 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -85,10 +85,12 @@
<subpackage name="util">
<!-- InterBrokerSendThread uses some clients classes that are not part of the public -->
<!-- API but are still relatively common -->
+ <allow class="org.apache.kafka.clients.admin.AdminClientConfig" />
<allow class="org.apache.kafka.clients.ClientRequest" />
<allow class="org.apache.kafka.clients.ClientResponse" />
<allow class="org.apache.kafka.clients.KafkaClient" />
<allow class="org.apache.kafka.clients.RequestCompletionHandler" />
+ <allow class="org.apache.kafka.clients.CommonClientConfigs" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.server.util.json" />
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d40c563..ad81d1f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -811,7 +811,7 @@
nodeId, disconnectState.remoteAddress());
break;
case NOT_CONNECTED:
- log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
+ log.warn("Connection to node {} ({}) could not be established. Node may not be available.", nodeId, disconnectState.remoteAddress());
break;
default:
break; // Disconnections in other states are logged at debug level in Selector
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 83722f8..e1a4b87 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -129,7 +129,7 @@
*/
public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
if (!supportedVersions.containsKey(apiKey))
- throw new UnsupportedVersionException("The broker does not support " + apiKey);
+ throw new UnsupportedVersionException("The node does not support " + apiKey);
ApiVersion supportedVersion = supportedVersions.get(apiKey);
Optional<ApiVersion> intersectVersion = ApiVersionsResponse.intersect(supportedVersion,
new ApiVersion()
@@ -140,7 +140,7 @@
if (intersectVersion.isPresent())
return intersectVersion.get().maxVersion();
else
- throw new UnsupportedVersionException("The broker does not support " + apiKey +
+ throw new UnsupportedVersionException("The node does not support " + apiKey +
" with version in range [" + oldestAllowedVersion + "," + latestAllowedVersion + "]. The supported" +
" range is [" + supportedVersion.minVersion() + "," + supportedVersion.maxVersion() + "].");
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index e6ef0de..bc119c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -48,6 +48,13 @@
private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
/**
+ * <code>bootstrap.controllers</code>
+ */
+ public static final String BOOTSTRAP_CONTROLLERS_CONFIG = "bootstrap.controllers";
+ public static final String BOOTSTRAP_CONTROLLERS_DOC = "A list of host/port pairs to use for establishing the initial " +
+ "connection to the KRaft controller quorum. This list should be in the form <code>host1:port1,host2:port2,...</code>.";
+
+ /**
* <code>client.dns.lookup</code>
*/
public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
@@ -135,8 +142,14 @@
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
+ "",
Importance.HIGH,
- BOOTSTRAP_SERVERS_DOC)
+ BOOTSTRAP_SERVERS_DOC).
+ define(BOOTSTRAP_CONTROLLERS_CONFIG,
+ Type.LIST,
+ "",
+ Importance.HIGH,
+ BOOTSTRAP_CONTROLLERS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
.define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, SEND_BUFFER_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java b/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
index 1442de5..22ecb8d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
@@ -35,7 +35,7 @@
*
* @throws IllegalArgumentException Raised when the condition described above is not met.
*/
- FinalizedVersionRange(final short minVersionLevel, final short maxVersionLevel) {
+ public FinalizedVersionRange(final short minVersionLevel, final short maxVersionLevel) {
if (minVersionLevel < 0 || maxVersionLevel < 0 || maxVersionLevel < minVersionLevel) {
throw new IllegalArgumentException(
String.format(
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1221dd9..4db6b27 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -38,6 +38,7 @@
import org.apache.kafka.clients.admin.internals.AdminApiHandler;
import org.apache.kafka.clients.admin.internals.AdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminBootstrapAddresses;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.AllBrokersStrategy;
import org.apache.kafka.clients.admin.internals.AlterConsumerGroupOffsetsHandler;
@@ -83,12 +84,14 @@
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnacceptableCredentialException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
@@ -126,6 +129,7 @@
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DescribeClusterRequestData;
+import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
@@ -232,7 +236,6 @@
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
-import java.net.InetSocketAddress;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
@@ -477,8 +480,11 @@
return createInternal(config, timeoutProcessorFactory, null);
}
- static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory,
- HostResolver hostResolver) {
+ static KafkaAdminClient createInternal(
+ AdminClientConfig config,
+ TimeoutProcessorFactory timeoutProcessorFactory,
+ HostResolver hostResolver
+ ) {
Metrics metrics = null;
NetworkClient networkClient = null;
Time time = Time.SYSTEM;
@@ -489,11 +495,12 @@
try {
// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
+ AdminBootstrapAddresses adminAddresses = AdminBootstrapAddresses.fromConfig(config);
AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
- config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
- metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
+ config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
+ adminAddresses.usingBootstrapControllers());
+ metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), time.milliseconds());
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
@@ -655,6 +662,7 @@
*/
private interface NodeProvider {
Node provide();
+ boolean supportsUseControllers();
}
private class MetadataUpdateNodeIdProvider implements NodeProvider {
@@ -662,13 +670,25 @@
public Node provide() {
return client.leastLoadedNode(time.milliseconds());
}
+
+ @Override
+ public boolean supportsUseControllers() {
+ return true;
+ }
}
private class ConstantNodeIdProvider implements NodeProvider {
private final int nodeId;
+ private final boolean supportsUseControllers;
+
+ ConstantNodeIdProvider(int nodeId, boolean supportsUseControllers) {
+ this.nodeId = nodeId;
+ this.supportsUseControllers = supportsUseControllers;
+ }
ConstantNodeIdProvider(int nodeId) {
this.nodeId = nodeId;
+ this.supportsUseControllers = false;
}
@Override
@@ -684,12 +704,27 @@
metadataManager.requestUpdate();
return null;
}
+
+ @Override
+ public boolean supportsUseControllers() {
+ return supportsUseControllers;
+ }
}
/**
* Provides the controller node.
*/
private class ControllerNodeProvider implements NodeProvider {
+ private final boolean supportsUseControllers;
+
+ ControllerNodeProvider(boolean supportsUseControllers) {
+ this.supportsUseControllers = supportsUseControllers;
+ }
+
+ ControllerNodeProvider() {
+ this.supportsUseControllers = false;
+ }
+
@Override
public Node provide() {
if (metadataManager.isReady() &&
@@ -699,6 +734,11 @@
metadataManager.requestUpdate();
return null;
}
+
+ @Override
+ public boolean supportsUseControllers() {
+ return supportsUseControllers;
+ }
}
/**
@@ -715,6 +755,67 @@
metadataManager.requestUpdate();
return null;
}
+
+ @Override
+ public boolean supportsUseControllers() {
+ return false;
+ }
+ }
+
+ /**
+ * Provides the least loaded broker, or the active kcontroller if we're using
+ * bootstrap.controllers.
+ */
+ private class ConstantBrokerOrActiveKController implements NodeProvider {
+ private final int nodeId;
+
+ ConstantBrokerOrActiveKController(int nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public Node provide() {
+ if (metadataManager.isReady()) {
+ if (metadataManager.usingBootstrapControllers()) {
+ return metadataManager.controller();
+ } else if (metadataManager.nodeById(nodeId) != null) {
+ return metadataManager.nodeById(nodeId);
+ }
+ }
+ metadataManager.requestUpdate();
+ return null;
+ }
+
+ @Override
+ public boolean supportsUseControllers() {
+ return true;
+ }
+ }
+
+ /**
+ * Provides the least loaded broker, or the active kcontroller if we're using
+ * bootstrap.controllers.
+ */
+ private class LeastLoadedBrokerOrActiveKController implements NodeProvider {
+ @Override
+ public Node provide() {
+ if (metadataManager.isReady()) {
+ if (metadataManager.usingBootstrapControllers()) {
+ return metadataManager.controller();
+ } else {
+ // This may return null if all nodes are busy.
+ // In that case, we will postpone node assignment.
+ return client.leastLoadedNode(time.milliseconds());
+ }
+ }
+ metadataManager.requestUpdate();
+ return null;
+ }
+
+ @Override
+ public boolean supportsUseControllers() {
+ return true;
+ }
}
abstract class Call {
@@ -1448,6 +1549,10 @@
log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
call.handleTimeoutFailure(time.milliseconds(),
new TimeoutException("The AdminClient thread is not accepting new calls."));
+ } else if (metadataManager.usingBootstrapControllers() &&
+ (!call.nodeProvider.supportsUseControllers())) {
+ call.fail(now, new UnsupportedEndpointTypeException("This Admin API is not " +
+ "yet supported when communicating directly with the controller quorum."));
} else {
enqueue(call, now);
}
@@ -1457,6 +1562,58 @@
* Create a new metadata call.
*/
private Call makeMetadataCall(long now) {
+ if (metadataManager.usingBootstrapControllers()) {
+ return makeControllerMetadataCall(now);
+ } else {
+ return makeBrokerMetadataCall(now);
+ }
+ }
+
+ private Call makeControllerMetadataCall(long now) {
+ // Use DescribeCluster here, as specified by KIP-919.
+ return new Call(true, "describeCluster", calcDeadlineMs(now, requestTimeoutMs),
+ new MetadataUpdateNodeIdProvider()) {
+ @Override
+ public DescribeClusterRequest.Builder createRequest(int timeoutMs) {
+ return new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
+ .setIncludeClusterAuthorizedOperations(false)
+ .setEndpointType(EndpointType.CONTROLLER.id()));
+ }
+
+ @Override
+ public void handleResponse(AbstractResponse abstractResponse) {
+ DescribeClusterResponse response = (DescribeClusterResponse) abstractResponse;
+ Cluster cluster;
+ try {
+ cluster = parseDescribeClusterResponse(response.data());
+ } catch (ApiException e) {
+ handleFailure(e);
+ return;
+ }
+ long now = time.milliseconds();
+ metadataManager.update(cluster, now);
+
+ // Unassign all unsent requests after a metadata refresh to allow for a new
+ // destination to be selected from the new metadata
+ unassignUnsentCalls(node -> true);
+ }
+
+ @Override
+ boolean handleUnsupportedVersionException(final UnsupportedVersionException e) {
+ metadataManager.updateFailed(e);
+ return false;
+ }
+
+ @Override
+ public void handleFailure(Throwable e) {
+ metadataManager.updateFailed(e);
+ }
+ };
+ }
+
+ private Call makeBrokerMetadataCall(long now) {
+ // We use MetadataRequest here so that we can continue to support brokers that are too
+ // old to handle DescribeCluster.
return new Call(true, "fetchMetadata", calcDeadlineMs(now, requestTimeoutMs),
new MetadataUpdateNodeIdProvider()) {
@Override
@@ -1481,6 +1638,12 @@
}
@Override
+ boolean handleUnsupportedVersionException(final UnsupportedVersionException e) {
+ metadataManager.updateFailed(e);
+ return false;
+ }
+
+ @Override
public void handleFailure(Throwable e) {
metadataManager.updateFailed(e);
}
@@ -1488,6 +1651,32 @@
}
}
+ static Cluster parseDescribeClusterResponse(DescribeClusterResponseData response) {
+ ApiError apiError = new ApiError(response.errorCode(), response.errorMessage());
+ if (apiError.isFailure()) {
+ throw apiError.exception();
+ }
+ if (response.endpointType() != EndpointType.CONTROLLER.id()) {
+ throw new MismatchedEndpointTypeException("Expected response from CONTROLLER " +
+ "endpoint, but got response from endpoint type " + (int) response.endpointType());
+ }
+ List<Node> nodes = new ArrayList<>();
+ Node controllerNode = null;
+ for (DescribeClusterResponseData.DescribeClusterBroker node : response.brokers()) {
+ Node newNode = new Node(node.brokerId(), node.host(), node.port(), node.rack());
+ nodes.add(newNode);
+ if (node.brokerId() == response.controllerId()) {
+ controllerNode = newNode;
+ }
+ }
+ return new Cluster(response.clusterId(),
+ nodes,
+ Collections.emptyList(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ controllerNode);
+ }
+
/**
* Returns true if a topic name cannot be represented in an RPC. This function does NOT check
* whether the name is too long, contains invalid characters, etc. It is better to enforce
@@ -2082,7 +2271,7 @@
final long now = time.milliseconds();
runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()),
- new LeastLoadedNodeProvider()) {
+ new LeastLoadedBrokerOrActiveKController()) {
private boolean useMetadataRequest = false;
@@ -2090,8 +2279,9 @@
AbstractRequest.Builder createRequest(int timeoutMs) {
if (!useMetadataRequest) {
return new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
- .setIncludeClusterAuthorizedOperations(
- options.includeAuthorizedOperations()));
+ .setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations())
+ .setEndpointType(metadataManager.usingBootstrapControllers() ?
+ EndpointType.CONTROLLER.id() : EndpointType.BROKER.id()));
} else {
// Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
@@ -2148,6 +2338,9 @@
@Override
boolean handleUnsupportedVersionException(final UnsupportedVersionException exception) {
+ if (metadataManager.usingBootstrapControllers()) {
+ return false;
+ }
if (useMetadataRequest) {
return false;
}
@@ -2319,11 +2512,11 @@
public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
// Partition the requested config resources based on which broker they must be sent to with the
// null broker being used for config resources which can be obtained from any broker
- final Map<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> brokerFutures = new HashMap<>(configResources.size());
+ final Map<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> nodeFutures = new HashMap<>(configResources.size());
for (ConfigResource resource : configResources) {
Integer broker = nodeFor(resource);
- brokerFutures.compute(broker, (key, value) -> {
+ nodeFutures.compute(broker, (key, value) -> {
if (value == null) {
value = new HashMap<>();
}
@@ -2333,12 +2526,12 @@
}
final long now = time.milliseconds();
- for (Map.Entry<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> entry : brokerFutures.entrySet()) {
- Integer broker = entry.getKey();
+ for (Map.Entry<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> entry : nodeFutures.entrySet()) {
+ final Integer node = entry.getKey();
Map<ConfigResource, KafkaFutureImpl<Config>> unified = entry.getValue();
runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
- broker != null ? new ConstantNodeIdProvider(broker) : new LeastLoadedNodeProvider()) {
+ node != null ? new ConstantNodeIdProvider(node, true) : new LeastLoadedBrokerOrActiveKController()) {
@Override
DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
@@ -2362,9 +2555,9 @@
DescribeConfigsResponseData.DescribeConfigsResult describeConfigsResult = entry.getValue();
KafkaFutureImpl<Config> future = unified.get(configResource);
if (future == null) {
- if (broker != null) {
- log.warn("The config {} in the response from broker {} is not in the request",
- configResource, broker);
+ if (node != null) {
+ log.warn("The config {} in the response from node {} is not in the request",
+ configResource, node);
} else {
log.warn("The config {} in the response from the least loaded broker is not in the request",
configResource);
@@ -2380,7 +2573,7 @@
}
completeUnrealizedFutures(
unified.entrySet().stream(),
- configResource -> "The broker response did not contain a result for config resource " + configResource);
+ configResource -> "The node response did not contain a result for config resource " + configResource);
}
@Override
@@ -2390,7 +2583,7 @@
}, now);
}
- return new DescribeConfigsResult(new HashMap<>(brokerFutures.entrySet().stream()
+ return new DescribeConfigsResult(new HashMap<>(nodeFutures.entrySet().stream()
.flatMap(x -> x.getValue().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
}
@@ -2441,20 +2634,20 @@
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
// We must make a separate AlterConfigs request for every BROKER resource we want to alter
- // and send the request to that specific broker. Other resources are grouped together into
- // a single request that may be sent to any broker.
+ // and send the request to that specific node. Other resources are grouped together into
+ // a single request that may be sent to any node.
final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
for (ConfigResource resource : configs.keySet()) {
Integer node = nodeFor(resource);
if (node != null) {
- NodeProvider nodeProvider = new ConstantNodeIdProvider(node);
+ NodeProvider nodeProvider = new ConstantBrokerOrActiveKController(node);
allFutures.putAll(alterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
} else
unifiedRequestResources.add(resource);
}
if (!unifiedRequestResources.isEmpty())
- allFutures.putAll(alterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
+ allFutures.putAll(alterConfigs(configs, options, unifiedRequestResources, new LeastLoadedBrokerOrActiveKController()));
return new AlterConfigsResult(new HashMap<>(allFutures));
}
@@ -2506,21 +2699,31 @@
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
final AlterConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
- // We must make a separate AlterConfigs request for every BROKER resource we want to alter
- // and send the request to that specific broker. Other resources are grouped together into
- // a single request that may be sent to any broker.
+ // BROKER_LOGGER requests always go to a specific, constant broker or controller node.
+ //
+ // BROKER resource changes for a specific (non-default) resource go to either that specific
+ // node (if using bootstrap.servers), or directly to the active controller (if using
+ // bootstrap.controllers)
+ //
+ // All other requests go to the least loaded broker (if using bootstrap.servers) or the
+ // active controller (if using bootstrap.controllers)
final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
for (ConfigResource resource : configs.keySet()) {
Integer node = nodeFor(resource);
+ if (metadataManager.usingBootstrapControllers()) {
+ if (!resource.type().equals(ConfigResource.Type.BROKER_LOGGER)) {
+ node = null;
+ }
+ }
if (node != null) {
- NodeProvider nodeProvider = new ConstantNodeIdProvider(node);
+ NodeProvider nodeProvider = new ConstantNodeIdProvider(node, true);
allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
} else
unifiedRequestResources.add(resource);
}
if (!unifiedRequestResources.isEmpty())
- allFutures.putAll(incrementalAlterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
+ allFutures.putAll(incrementalAlterConfigs(configs, options, unifiedRequestResources, new LeastLoadedBrokerOrActiveKController()));
return new AlterConfigsResult(new HashMap<>(allFutures));
}
@@ -3872,7 +4075,7 @@
final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call(
- "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) {
+ "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedBrokerOrActiveKController()) {
private FeatureMetadata createFeatureMetadata(final ApiVersionsResponse response) {
final Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>();
@@ -3938,7 +4141,7 @@
final long now = time.milliseconds();
final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()),
- new ControllerNodeProvider()) {
+ new ControllerNodeProvider(true)) {
@Override
UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
@@ -4009,7 +4212,7 @@
@Override
public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
- NodeProvider provider = new LeastLoadedNodeProvider();
+ NodeProvider provider = new LeastLoadedBrokerOrActiveKController();
final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddresses.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddresses.java
new file mode 100644
index 0000000..ee39f4a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddresses.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+final public class AdminBootstrapAddresses {
+ private final boolean usingBootstrapControllers;
+ private final List<InetSocketAddress> addresses;
+
+ AdminBootstrapAddresses(
+ boolean usingBootstrapControllers,
+ List<InetSocketAddress> addresses
+ ) {
+ this.usingBootstrapControllers = usingBootstrapControllers;
+ this.addresses = addresses;
+ }
+
+ public boolean usingBootstrapControllers() {
+ return usingBootstrapControllers;
+ }
+
+ public List<InetSocketAddress> addresses() {
+ return addresses;
+ }
+
+ public static AdminBootstrapAddresses fromConfig(AbstractConfig config) {
+ List<String> bootstrapServers = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ if (bootstrapServers == null) {
+ bootstrapServers = Collections.emptyList();
+ }
+ List<String> controllerServers = config.getList(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
+ if (controllerServers == null) {
+ controllerServers = Collections.emptyList();
+ }
+ String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG);
+ if (bootstrapServers.isEmpty()) {
+ if (controllerServers.isEmpty()) {
+ throw new ConfigException("You must set either " +
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " or " +
+ AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
+ } else {
+ return new AdminBootstrapAddresses(true,
+ ClientUtils.parseAndValidateAddresses(controllerServers, clientDnsLookupConfig));
+ }
+ } else {
+ if (controllerServers.isEmpty()) {
+ return new AdminBootstrapAddresses(false,
+ ClientUtils.parseAndValidateAddresses(bootstrapServers, clientDnsLookupConfig));
+ } else {
+ throw new ConfigException("You cannot set both " +
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " and " +
+ AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
+ }
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(usingBootstrapControllers, addresses);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || (!o.getClass().equals(AdminBootstrapAddresses.class))) return false;
+ AdminBootstrapAddresses other = (AdminBootstrapAddresses) o;
+ return usingBootstrapControllers == other.usingBootstrapControllers &&
+ addresses.equals(other.addresses);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("AdminBootstrapAddresses");
+ bld.append("(usingBoostrapControllers=").append(usingBootstrapControllers);
+ bld.append(", addresses=[");
+ String prefix = "";
+ for (InetSocketAddress address : addresses) {
+ bld.append(prefix).append(address);
+ prefix = ", ";
+ }
+ bld.append("])");
+ return bld.toString();
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index f1ccd9a..f8123c4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -21,7 +21,11 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
+import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogContext;
@@ -53,6 +57,11 @@
private final long metadataExpireMs;
/**
+ * True if we are communicating directly with the controller quorum as specified by KIP-919.
+ */
+ private final boolean usingBootstrapControllers;
+
+ /**
* Used to update the NetworkClient metadata.
*/
private final AdminMetadataUpdater updater;
@@ -79,10 +88,9 @@
private Cluster cluster = Cluster.empty();
/**
- * If we got an authorization exception when we last attempted to fetch
- * metadata, this is it; null, otherwise.
+ * If this is non-null, it is a fatal exception that will terminate all attempts at communication.
*/
- private AuthenticationException authException = null;
+ private ApiException fatalException = null;
public class AdminMetadataUpdater implements MetadataUpdater {
@Override
@@ -130,21 +138,31 @@
UPDATE_PENDING
}
- public AdminMetadataManager(LogContext logContext, long refreshBackoffMs, long metadataExpireMs) {
+ public AdminMetadataManager(
+ LogContext logContext,
+ long refreshBackoffMs,
+ long metadataExpireMs,
+ boolean usingBootstrapControllers
+ ) {
this.log = logContext.logger(AdminMetadataManager.class);
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
+ this.usingBootstrapControllers = usingBootstrapControllers;
this.updater = new AdminMetadataUpdater();
}
+ public boolean usingBootstrapControllers() {
+ return usingBootstrapControllers;
+ }
+
public AdminMetadataUpdater updater() {
return updater;
}
public boolean isReady() {
- if (authException != null) {
- log.debug("Metadata is not usable: failed to get metadata.", authException);
- throw authException;
+ if (fatalException != null) {
+ log.debug("Metadata is not usable: failed to get metadata.", fatalException);
+ throw fatalException;
}
if (cluster.nodes().isEmpty()) {
log.trace("Metadata is not ready: bootstrap nodes have not been " +
@@ -230,7 +248,21 @@
if (exception instanceof AuthenticationException) {
log.warn("Metadata update failed due to authentication error", exception);
- this.authException = (AuthenticationException) exception;
+ this.fatalException = (ApiException) exception;
+ } else if (exception instanceof MismatchedEndpointTypeException) {
+ log.warn("Metadata update failed due to mismatched endpoint type error", exception);
+ this.fatalException = (ApiException) exception;
+ } else if (exception instanceof UnsupportedEndpointTypeException) {
+ log.warn("Metadata update failed due to unsupported endpoint type error", exception);
+ this.fatalException = (ApiException) exception;
+ } else if (exception instanceof UnsupportedVersionException) {
+ if (usingBootstrapControllers) {
+ log.warn("The remote node is not a CONTROLLER that supports the KIP-919 " +
+ "DESCRIBE_CLUSTER api.", exception);
+ } else {
+ log.warn("The remote node is not a BROKER that supports the METADATA api.", exception);
+ }
+ this.fatalException = (ApiException) exception;
} else {
log.info("Metadata update failed", exception);
}
@@ -249,7 +281,7 @@
}
this.state = State.QUIESCENT;
- this.authException = null;
+ this.fatalException = null;
if (!cluster.nodes().isEmpty()) {
this.cluster = cluster;
diff --git a/clients/src/main/resources/common/message/DescribeConfigsRequest.json b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
index f48b168..23be19c 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 32,
"type": "request",
- "listeners": ["zkBroker", "broker"],
+ "listeners": ["zkBroker", "broker", "controller"],
"name": "DescribeConfigsRequest",
// Version 1 adds IncludeSynonyms.
// Version 2 is the same as version 1.
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 744ec12..99c823e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -73,7 +73,7 @@
AdminMetadataManager metadataManager = new AdminMetadataManager(new LogContext(),
adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
- adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+ adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
this.mockClient = new MockClient(time, new MockClient.MockMetadataUpdater() {
@Override
public List<Node> fetchNodes() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index f942fc6..229d311 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -55,6 +55,7 @@
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
+import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
@@ -284,6 +285,55 @@
}
@Test
+ public void testParseDescribeClusterResponseWithError() {
+ assertThrows(MismatchedEndpointTypeException.class,
+ () -> KafkaAdminClient.parseDescribeClusterResponse(new DescribeClusterResponseData().
+ setErrorCode(Errors.MISMATCHED_ENDPOINT_TYPE.code()).
+ setErrorMessage("The request was sent to an endpoint of type BROKER, " +
+ "but we wanted an endpoint of type CONTROLLER")));
+ }
+
+ @Test
+ public void testParseDescribeClusterResponseWithUnexpectedEndpointType() {
+ assertThrows(MismatchedEndpointTypeException.class,
+ () -> KafkaAdminClient.parseDescribeClusterResponse(new DescribeClusterResponseData().
+ setEndpointType(EndpointType.BROKER.id())));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testParseSuccessfulDescribeClusterResponse(boolean includeController) {
+ Cluster cluster = KafkaAdminClient.parseDescribeClusterResponse(new DescribeClusterResponseData().
+ setControllerId(includeController ? 0 : -1).
+ setEndpointType(EndpointType.CONTROLLER.id()).
+ setClusterId("Ek8tjqq1QBWfnaoyHFZqDg").
+ setBrokers(new DescribeClusterResponseData.DescribeClusterBrokerCollection(Arrays.asList(
+ new DescribeClusterBroker().
+ setBrokerId(0).
+ setHost("controller0.com").
+ setPort(9092),
+ new DescribeClusterBroker().
+ setBrokerId(1).
+ setHost("controller1.com").
+ setPort(9092),
+ new DescribeClusterBroker().
+ setBrokerId(2).
+ setHost("controller2.com").
+ setPort(9092)).iterator())));
+ if (includeController) {
+ assertNotNull(cluster.controller());
+ assertEquals(0, cluster.controller().id());
+ } else {
+ assertNull(cluster.controller());
+ }
+ assertEquals("Ek8tjqq1QBWfnaoyHFZqDg", cluster.clusterResource().clusterId());
+ assertEquals(new HashSet<>(Arrays.asList(
+ new Node(0, "controller0.com", 9092),
+ new Node(1, "controller1.com", 9092),
+ new Node(2, "controller2.com", 9092))), new HashSet<>(cluster.nodes()));
+ }
+
+ @Test
public void testGetOrCreateListValue() {
Map<String, List<String>> map = new HashMap<>();
List<String> fooList = KafkaAdminClient.getOrCreateListValue(map, "foo");
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddressesTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddressesTest.java
new file mode 100644
index 0000000..2b6923e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddressesTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class AdminBootstrapAddressesTest {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testNoBootstrapSet(boolean nullValue) {
+ Map<String, Object> map = new HashMap<>();
+ if (nullValue) {
+ map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, null);
+ map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, null);
+ } else {
+ map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+ map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "");
+ }
+ AdminClientConfig config = new AdminClientConfig(map);
+ assertEquals("You must set either bootstrap.servers or bootstrap.controllers",
+ assertThrows(ConfigException.class, () -> AdminBootstrapAddresses.fromConfig(config)).
+ getMessage());
+ }
+
+ @Test
+ public void testTwoBootstrapsSet() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "localhost:9092");
+ map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ AdminClientConfig config = new AdminClientConfig(map);
+ assertEquals("You cannot set both bootstrap.servers and bootstrap.controllers",
+ assertThrows(ConfigException.class, () -> AdminBootstrapAddresses.fromConfig(config)).
+ getMessage());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testFromConfig(boolean usingBootstrapControllers) {
+ Map<String, Object> map = new HashMap<>();
+ String connectString = "localhost:9092,localhost:9093,localhost:9094";
+ if (usingBootstrapControllers) {
+ map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, connectString);
+ } else {
+ map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, connectString);
+ }
+ AdminClientConfig config = new AdminClientConfig(map);
+ AdminBootstrapAddresses addresses = AdminBootstrapAddresses.fromConfig(config);
+ assertEquals(usingBootstrapControllers, addresses.usingBootstrapControllers());
+ assertEquals(Arrays.asList(
+ new InetSocketAddress("localhost", 9092),
+ new InetSocketAddress("localhost", 9093),
+ new InetSocketAddress("localhost", 9094)),
+ addresses.addresses());
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
index 05587aa..d5711df 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
@@ -24,6 +24,8 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.net.InetSocketAddress;
import java.util.Collections;
@@ -40,7 +42,15 @@
private final long refreshBackoffMs = 100;
private final long metadataExpireMs = 60000;
private final AdminMetadataManager mgr = new AdminMetadataManager(
- logContext, refreshBackoffMs, metadataExpireMs);
+ logContext, refreshBackoffMs, metadataExpireMs, false);
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testSetUsingBootstrapControllers(boolean usingBootstrapControllers) {
+ AdminMetadataManager manager = new AdminMetadataManager(
+ logContext, refreshBackoffMs, metadataExpireMs, usingBootstrapControllers);
+ assertEquals(usingBootstrapControllers, manager.usingBootstrapControllers());
+ }
@Test
public void testMetadataReady() {
diff --git a/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java b/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java
new file mode 100644
index 0000000..f66454e
--- /dev/null
+++ b/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.logger;
+
+import kafka.utils.Log4jController;
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.LogLevelConfig;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
+import org.apache.kafka.common.protocol.Errors;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER_LOGGER;
+
+/**
+ * Manages runtimes changes to slf4j settings.
+ */
+public class RuntimeLoggerManager {
+ static final String VALID_LOG_LEVELS_STRING;
+
+ static {
+ ArrayList<String> logLevels = new ArrayList<>(LogLevelConfig.VALID_LOG_LEVELS);
+ logLevels.sort(String::compareTo);
+ VALID_LOG_LEVELS_STRING = String.join(", ", logLevels);
+ }
+
+ private final int nodeId;
+ private final Logger log;
+
+ public RuntimeLoggerManager(int nodeId, Logger log) {
+ this.nodeId = nodeId;
+ this.log = log;
+ }
+
+ public void applyChangesForResource(
+ boolean authorizedForClusterResource,
+ boolean validateOnly,
+ AlterConfigsResource resource
+ ) {
+ if (!authorizedForClusterResource) {
+ throw new ClusterAuthorizationException(Errors.CLUSTER_AUTHORIZATION_FAILED.message());
+ }
+ validateResourceNameIsNodeId(resource.resourceName());
+ validateLogLevelConfigs(resource.configs());
+ if (!validateOnly) {
+ alterLogLevelConfigs(resource.configs());
+ }
+ }
+
+ void alterLogLevelConfigs(Collection<AlterableConfig> ops) {
+ ops.forEach(op -> {
+ String loggerName = op.name();
+ String logLevel = op.value();
+ switch (OpType.forId(op.configOperation())) {
+ case SET:
+ if (Log4jController.logLevel(loggerName, logLevel)) {
+ log.warn("Updated the log level of {} to {}", loggerName, logLevel);
+ } else {
+ log.error("Failed to update the log level of {} to {}", loggerName, logLevel);
+ }
+ break;
+ case DELETE:
+ if (Log4jController.unsetLogLevel(loggerName)) {
+ log.warn("Unset the log level of {}", loggerName);
+ } else {
+ log.error("Failed to unset the log level of {}", loggerName);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Invalid log4j configOperation: " + op.configOperation());
+ }
+ });
+ }
+
+ void validateResourceNameIsNodeId(String resourceName) {
+ int requestId;
+ try {
+ requestId = Integer.parseInt(resourceName);
+ } catch (NumberFormatException e) {
+ throw new InvalidRequestException("Node id must be an integer, but it is: " +
+ resourceName);
+ }
+ if (requestId != nodeId) {
+ throw new InvalidRequestException("Unexpected node id. Expected " + nodeId +
+ ", but received " + nodeId);
+ }
+ }
+
+ void validateLoggerNameExists(String loggerName) {
+ if (!Log4jController.loggerExists(loggerName)) {
+ throw new InvalidConfigurationException("Logger " + loggerName + " does not exist!");
+ }
+ }
+
+ void validateLogLevelConfigs(Collection<AlterableConfig> ops) {
+ ops.forEach(op -> {
+ String loggerName = op.name();
+ switch (OpType.forId(op.configOperation())) {
+ case SET:
+ validateLoggerNameExists(loggerName);
+ String logLevel = op.value();
+ if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
+ throw new InvalidConfigurationException("Cannot set the log level of " +
+ loggerName + " to " + logLevel + " as it is not a supported log level. " +
+ "Valid log levels are " + VALID_LOG_LEVELS_STRING);
+ }
+ break;
+ case DELETE:
+ validateLoggerNameExists(loggerName);
+ if (loggerName.equals(Log4jController.ROOT_LOGGER())) {
+ throw new InvalidRequestException("Removing the log level of the " +
+ Log4jController.ROOT_LOGGER() + " logger is not allowed");
+ }
+ break;
+ case APPEND:
+ throw new InvalidRequestException(OpType.APPEND +
+ " operation is not allowed for the " + BROKER_LOGGER + " resource");
+ case SUBTRACT:
+ throw new InvalidRequestException(OpType.SUBTRACT +
+ " operation is not allowed for the " + BROKER_LOGGER + " resource");
+ default:
+ throw new InvalidRequestException("Unknown operation type " +
+ (int) op.configOperation() + " is not alowed for the " +
+ BROKER_LOGGER + " resource");
+ }
+ });
+ }
+}
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 0bd4e30..309d95a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -27,7 +27,6 @@
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
-import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
@@ -317,7 +316,11 @@
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
new Properties()
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+ CommandLineUtils.initializeBootstrapProperties(opts.parser,
+ opts.options,
+ props,
+ opts.bootstrapServerOpt,
+ opts.bootstrapControllerOpt)
val adminClient = Admin.create(props)
if (opts.options.has(opts.alterOpt) && opts.entityTypes.size != opts.entityNames.size)
@@ -762,11 +765,14 @@
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
- val bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka server to connect to. " +
- "This is required for describing and altering broker configs.")
+ val bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka servers to connect to.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
+ val bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The Kafka controllers to connect to.")
+ .withRequiredArg
+ .describedAs("controller to connect to")
+ .ofType(classOf[String])
val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " +
"This is used only with --bootstrap-server option for describing and altering broker configs.")
.withRequiredArg
@@ -873,14 +879,14 @@
if (entityTypeVals.size != entityTypeVals.distinct.size)
throw new IllegalArgumentException(s"Duplicate entity type(s) specified: ${entityTypeVals.diff(entityTypeVals.distinct).mkString(",")}")
- val (allowedEntityTypes, connectOptString) = if (options.has(bootstrapServerOpt))
- (BrokerSupportedConfigTypes, "--bootstrap-server")
+ val (allowedEntityTypes, connectOptString) = if (options.has(bootstrapServerOpt) || options.has(bootstrapControllerOpt))
+ (BrokerSupportedConfigTypes, "--bootstrap-server or --bootstrap-controller")
else
(ZkSupportedConfigTypes, "--zookeeper")
entityTypeVals.foreach(entityTypeVal =>
if (!allowedEntityTypes.contains(entityTypeVal))
- throw new IllegalArgumentException(s"Invalid entity type $entityTypeVal, the entity type must be one of ${allowedEntityTypes.mkString(", ")} with the $connectOptString argument")
+ throw new IllegalArgumentException(s"Invalid entity type $entityTypeVal, the entity type must be one of ${allowedEntityTypes.mkString(", ")} with a $connectOptString argument")
)
if (entityTypeVals.isEmpty)
throw new IllegalArgumentException("At least one entity type must be specified")
@@ -894,21 +900,20 @@
val hasEntityName = entityNames.exists(_.nonEmpty)
val hasEntityDefault = entityNames.exists(_.isEmpty)
- if (!options.has(bootstrapServerOpt) && !options.has(zkConnectOpt))
- throw new IllegalArgumentException("One of the required --bootstrap-server or --zookeeper arguments must be specified")
- else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt))
- throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified")
+ val numConnectOptions = (if (options.has(bootstrapServerOpt)) 1 else 0) +
+ (if (options.has(bootstrapControllerOpt)) 1 else 0) +
+ (if (options.has(zkConnectOpt)) 1 else 0)
+ if (numConnectOptions == 0)
+ throw new IllegalArgumentException("One of the required --bootstrap-server, --boostrap-controller, or --zookeeper arguments must be specified")
+ else if (numConnectOptions > 1)
+ throw new IllegalArgumentException("Only one of --bootstrap-server, --boostrap-controller, and --zookeeper can be specified")
if (options.has(allOpt) && options.has(zkConnectOpt)) {
throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all")
}
-
- if (options.has(zkTlsConfigFile) && options.has(bootstrapServerOpt)) {
- throw new IllegalArgumentException("--bootstrap-server doesn't support --zk-tls-config-file option. " +
- "If you intend the command to communicate directly with ZooKeeper, please use the option --zookeeper instead of --bootstrap-server. " +
- "Otherwise, remove the --zk-tls-config-file option.")
+ if (options.has(zkTlsConfigFile) && !options.has(zkConnectOpt)) {
+ throw new IllegalArgumentException("Only the --zookeeper option can be used with the --zk-tls-config-file option.")
}
-
if (hasEntityName && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) {
Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId =>
try brokerId.toInt catch {
diff --git a/core/src/main/scala/kafka/server/AuthHelper.scala b/core/src/main/scala/kafka/server/AuthHelper.scala
index 5e6a86a..f3be70e 100644
--- a/core/src/main/scala/kafka/server/AuthHelper.scala
+++ b/core/src/main/scala/kafka/server/AuthHelper.scala
@@ -185,6 +185,7 @@
setClusterId(clusterId).
setControllerId(effectiveControllerId).
setClusterAuthorizedOperations(clusterAuthorizedOperations).
- setBrokers(nodes)
+ setBrokers(nodes).
+ setEndpointType(expectedEndpointType.id())
}
}
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index c6ea4e1..61b8d3a 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -16,24 +16,23 @@
*/
package kafka.server
+import kafka.server.logger.RuntimeLoggerManager
+
import java.util
import java.util.Properties
-
import kafka.server.metadata.ConfigRepository
-import kafka.utils.Log4jController
import kafka.utils._
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC}
-import org.apache.kafka.common.config.{ConfigDef, ConfigResource, LogLevelConfig}
-import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidConfigurationException, InvalidRequestException}
+import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
+import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.message.{AlterConfigsRequestData, AlterConfigsResponseData, IncrementalAlterConfigsRequestData, IncrementalAlterConfigsResponseData}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterableConfig => IAlterableConfig}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
-import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, UNKNOWN_SERVER_ERROR}
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.common.resource.{Resource, ResourceType}
@@ -87,6 +86,8 @@
this.logIdent = "[ConfigAdminManager[nodeId=" + nodeId + "]: "
+ val runtimeLoggerManager = new RuntimeLoggerManager(nodeId, logger.underlying)
+
/**
* Preprocess an incremental configuration operation on the broker. This step handles
* setting log4j levels, as well as filtering out some invalid resource requests that
@@ -135,14 +136,10 @@
}
resourceType match {
case BROKER_LOGGER =>
- if (!authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME)) {
- throw new ClusterAuthorizationException(Errors.CLUSTER_AUTHORIZATION_FAILED.message())
- }
- validateResourceNameIsCurrentNodeId(resource.resourceName())
- validateLogLevelConfigs(resource.configs())
- if (!request.validateOnly()) {
- alterLogLevelConfigs(resource.configs())
- }
+ runtimeLoggerManager.applyChangesForResource(
+ authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME),
+ request.validateOnly(),
+ resource)
results.put(resource, ApiError.NONE)
case BROKER =>
// The resource name must be either blank (if setting a cluster config) or
@@ -290,56 +287,6 @@
throw new InvalidRequestException(s"Unexpected broker id, expected ${nodeId}, but received ${name}")
}
}
-
- def validateLogLevelConfigs(ops: util.Collection[IAlterableConfig]): Unit = {
- def validateLoggerNameExists(loggerName: String): Unit = {
- if (!Log4jController.loggerExists(loggerName)) {
- throw new InvalidConfigurationException(s"Logger $loggerName does not exist!")
- }
- }
- ops.forEach { op =>
- val loggerName = op.name
- OpType.forId(op.configOperation()) match {
- case OpType.SET =>
- validateLoggerNameExists(loggerName)
- val logLevel = op.value()
- if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
- val validLevelsStr = LogLevelConfig.VALID_LOG_LEVELS.asScala.mkString(", ")
- throw new InvalidConfigurationException(
- s"Cannot set the log level of $loggerName to $logLevel as it is not a supported log level. " +
- s"Valid log levels are $validLevelsStr"
- )
- }
- case OpType.DELETE =>
- validateLoggerNameExists(loggerName)
- if (loggerName == Log4jController.ROOT_LOGGER)
- throw new InvalidRequestException(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed")
- case OpType.APPEND => throw new InvalidRequestException(s"${OpType.APPEND} " +
- s"operation is not allowed for the ${BROKER_LOGGER} resource")
- case OpType.SUBTRACT => throw new InvalidRequestException(s"${OpType.SUBTRACT} " +
- s"operation is not allowed for the ${BROKER_LOGGER} resource")
- case _ => throw new InvalidRequestException(s"Unknown operation type ${op.configOperation()} " +
- s"is not allowed for the ${BROKER_LOGGER} resource")
- }
- }
- }
-
- def alterLogLevelConfigs(ops: util.Collection[IAlterableConfig]): Unit = {
- ops.forEach { op =>
- val loggerName = op.name()
- val logLevel = op.value()
- OpType.forId(op.configOperation()) match {
- case OpType.SET =>
- info(s"Updating the log level of $loggerName to $logLevel")
- Log4jController.logLevel(loggerName, logLevel)
- case OpType.DELETE =>
- info(s"Unset the log level of $loggerName")
- Log4jController.unsetLogLevel(loggerName)
- case _ => throw new IllegalArgumentException(
- s"Invalid log4j configOperation: ${op.configOperation()}")
- }
- }
- }
}
object ConfigAdminManager {
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 5a9616f..bf9e106 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -17,17 +17,22 @@
package kafka.server
+import kafka.network.RequestChannel
+
import java.util.{Collections, Properties}
import kafka.server.metadata.ConfigRepository
import kafka.utils.{Log4jController, Logging}
+import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
import org.apache.kafka.common.message.DescribeConfigsResponseData
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
+import org.apache.kafka.common.requests.{ApiError, DescribeConfigsRequest, DescribeConfigsResponse}
import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.storage.internals.log.LogConfig
@@ -41,6 +46,36 @@
config.originals.asScala.filter(_._2 != null) ++ config.nonInternalValues.asScala
}
+ def handleDescribeConfigsRequest(
+ request: RequestChannel.Request,
+ authHelper: AuthHelper
+ ): DescribeConfigsResponseData = {
+ val describeConfigsRequest = request.body[DescribeConfigsRequest]
+ val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.partition { resource =>
+ ConfigResource.Type.forId(resource.resourceType) match {
+ case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
+ authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
+ case ConfigResource.Type.TOPIC =>
+ authHelper.authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName)
+ case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
+ }
+ }
+ val authorizedConfigs = describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynonyms, describeConfigsRequest.data.includeDocumentation)
+ val unauthorizedConfigs = unauthorizedResources.map { resource =>
+ val error = ConfigResource.Type.forId(resource.resourceType) match {
+ case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED
+ case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
+ case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
+ }
+ new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code)
+ .setErrorMessage(error.message)
+ .setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
+ .setResourceName(resource.resourceName)
+ .setResourceType(resource.resourceType)
+ }
+ new DescribeConfigsResponseData().setResults((authorizedConfigs ++ unauthorizedConfigs).asJava)
+ }
+
def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource],
includeSynonyms: Boolean,
includeDocumentation: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = {
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 82c7b2b..856d936 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -26,12 +26,14 @@
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.logger.RuntimeLoggerManager
+import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType}
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, CREATE_TOKENS, DELETE, DESCRIBE, DESCRIBE_CONFIGS}
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException}
+import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException, UnsupportedVersionException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
@@ -73,12 +75,15 @@
val config: KafkaConfig,
val metaProperties: MetaProperties,
val registrationsPublisher: ControllerRegistrationsPublisher,
- val apiVersionManager: ApiVersionManager
+ val apiVersionManager: ApiVersionManager,
+ val metadataCache: KRaftMetadataCache
) extends ApiRequestHandler with Logging {
this.logIdent = s"[ControllerApis nodeId=${config.nodeId}] "
val authHelper = new AuthHelper(authorizer)
+ val configHelper = new ConfigHelper(metadataCache, config, metadataCache)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
+ val runtimeLoggerManager = new RuntimeLoggerManager(config.nodeId, logger.underlying)
private val aclApis = new AclApis(authHelper, authorizer, requestHelper, "controller", config)
def isClosed: Boolean = aclApis.isClosed
@@ -115,6 +120,7 @@
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
+ case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
@@ -709,10 +715,26 @@
val duplicateResources = new util.HashSet[ConfigResource]
val configChanges = new util.HashMap[ConfigResource,
util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
+ val brokerLoggerResponses = new util.ArrayList[AlterConfigsResourceResponse](1)
alterConfigsRequest.data.resources.forEach { resource =>
val configResource = new ConfigResource(
ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
- if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
+ if (configResource.`type`().equals(ConfigResource.Type.BROKER_LOGGER)) {
+ val apiError = try {
+ runtimeLoggerManager.applyChangesForResource(
+ authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME),
+ alterConfigsRequest.data().validateOnly(),
+ resource)
+ ApiError.NONE
+ } catch {
+ case t: Throwable => ApiError.fromThrowable(t)
+ }
+ brokerLoggerResponses.add(new AlterConfigsResourceResponse().
+ setResourceName(resource.resourceName()).
+ setResourceType(resource.resourceType()).
+ setErrorCode(apiError.error().code()).
+ setErrorMessage(if (apiError.isFailure()) apiError.messageWithFallback() else null))
+ } else if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(UNSUPPORTED_VERSION.code()).
setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
@@ -759,6 +781,7 @@
setErrorMessage(entry.getValue.message()).
setResourceName(entry.getKey.name()).
setResourceType(entry.getKey.`type`().id())))
+ brokerLoggerResponses.forEach(r => response.responses().add(r))
requestHelper.sendResponseMaybeThrottle(request, throttleMs =>
new IncrementalAlterConfigsResponse(response.setThrottleTimeMs(throttleMs)))
}
@@ -791,6 +814,13 @@
}
}
+ def handleDescribeConfigsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
+ val responseData = configHelper.handleDescribeConfigsRequest(request, authHelper)
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DescribeConfigsResponse(responseData.setThrottleTimeMs(requestThrottleMs)))
+ CompletableFuture.completedFuture[Unit](())
+ }
+
def createPartitions(
context: ControllerRequestContext,
request: CreatePartitionsRequestData,
@@ -1028,6 +1058,10 @@
def handleDescribeCluster(request: RequestChannel.Request): CompletableFuture[Unit] = {
// Unlike on the broker, DESCRIBE_CLUSTER on the controller requires a high level of
// permissions (ALTER on CLUSTER).
+ if (!apiVersionManager.features.metadataVersion().isControllerRegistrationSupported()) {
+ throw new UnsupportedVersionException("Direct-to-controller communication is not " +
+ "supported with the current MetadataVersion.")
+ }
authHelper.authorizeClusterOperation(request, ALTER)
val response = authHelper.computeDescribeClusterResponse(
request,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 755da0e..42b275a 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -26,7 +26,7 @@
import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
-import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher}
+import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -117,6 +117,8 @@
var migrationSupport: Option[ControllerMigrationSupport] = None
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
val metadataPublishers: util.List[MetadataPublisher] = new util.ArrayList[MetadataPublisher]()
+ @volatile var metadataCache : KRaftMetadataCache = _
+ @volatile var metadataCachePublisher: KRaftMetadataCachePublisher = _
@volatile var featuresPublisher: FeaturesPublisher = _
@volatile var registrationsPublisher: ControllerRegistrationsPublisher = _
@volatile var incarnationId: Uuid = _
@@ -159,6 +161,10 @@
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
+ metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+
+ metadataCachePublisher = new KRaftMetadataCachePublisher(metadataCache)
+
featuresPublisher = new FeaturesPublisher(logContext)
registrationsPublisher = new ControllerRegistrationsPublisher()
@@ -313,14 +319,19 @@
config,
sharedServer.metaProps,
registrationsPublisher,
- apiVersionManager)
+ apiVersionManager,
+ metadataCache)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel,
controllerApis,
time,
config.numIoThreads,
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
- DataPlaneAcceptor.ThreadPrefix)
+ DataPlaneAcceptor.ThreadPrefix,
+ "controller")
+
+ // Set up the metadata cache publisher.
+ metadataPublishers.add(metadataCachePublisher)
// Set up the metadata features publisher.
metadataPublishers.add(featuresPublisher)
@@ -453,14 +464,6 @@
// Ensure that we're not the Raft leader prior to shutting down our socket server, for a
// smoother transition.
sharedServer.ensureNotRaftLeader()
- if (featuresPublisher != null) {
- featuresPublisher.close()
- featuresPublisher = null
- }
- if (registrationsPublisher != null) {
- registrationsPublisher.close()
- registrationsPublisher = null
- }
incarnationId = null
if (registrationManager != null) {
CoreUtils.swallow(registrationManager.close(), this)
@@ -472,6 +475,21 @@
}
metadataPublishers.forEach(p => sharedServer.loader.removeAndClosePublisher(p).get())
metadataPublishers.clear()
+ if (metadataCache != null) {
+ metadataCache = null
+ }
+ if (metadataCachePublisher != null) {
+ metadataCachePublisher.close()
+ metadataCachePublisher = null
+ }
+ if (featuresPublisher != null) {
+ featuresPublisher.close()
+ featuresPublisher = null
+ }
+ if (registrationsPublisher != null) {
+ registrationsPublisher.close()
+ registrationsPublisher = null
+ }
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
migrationSupport.foreach(_.shutdown(this))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8bef9da..fade1d1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2926,33 +2926,9 @@
}
def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
- val describeConfigsRequest = request.body[DescribeConfigsRequest]
- val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.partition { resource =>
- ConfigResource.Type.forId(resource.resourceType) match {
- case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
- authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
- case ConfigResource.Type.TOPIC =>
- authHelper.authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName)
- case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
- }
- }
- val authorizedConfigs = configHelper.describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynonyms, describeConfigsRequest.data.includeDocumentation)
- val unauthorizedConfigs = unauthorizedResources.map { resource =>
- val error = ConfigResource.Type.forId(resource.resourceType) match {
- case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED
- case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
- case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
- }
- new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code)
- .setErrorMessage(error.message)
- .setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
- .setResourceName(resource.resourceName)
- .setResourceType(resource.resourceType)
- }
-
+ val responseData = configHelper.handleDescribeConfigsRequest(request, authHelper)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
- new DescribeConfigsResponse(new DescribeConfigsResponseData().setThrottleTimeMs(requestThrottleMs)
- .setResults((authorizedConfigs ++ unauthorizedConfigs).asJava)))
+ new DescribeConfigsResponse(responseData.setThrottleTimeMs(requestThrottleMs)))
}
def handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 5bfe500..51e3aff 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -81,14 +81,17 @@
/**
* A thread that answers kafka requests.
*/
-class KafkaRequestHandler(id: Int,
- brokerId: Int,
- val aggregateIdleMeter: Meter,
- val totalHandlerThreads: AtomicInteger,
- val requestChannel: RequestChannel,
- apis: ApiRequestHandler,
- time: Time) extends Runnable with Logging {
- this.logIdent = s"[Kafka Request Handler $id on Broker $brokerId], "
+class KafkaRequestHandler(
+ id: Int,
+ brokerId: Int,
+ val aggregateIdleMeter: Meter,
+ val totalHandlerThreads: AtomicInteger,
+ val requestChannel: RequestChannel,
+ apis: ApiRequestHandler,
+ time: Time,
+ nodeName: String = "broker"
+) extends Runnable with Logging {
+ this.logIdent = s"[Kafka Request Handler $id on ${nodeName.capitalize} $brokerId], "
private val shutdownComplete = new CountDownLatch(1)
private val requestLocal = RequestLocal.withThreadConfinedCaching
@volatile private var stopped = false
@@ -184,13 +187,16 @@
}
-class KafkaRequestHandlerPool(val brokerId: Int,
- val requestChannel: RequestChannel,
- val apis: ApiRequestHandler,
- time: Time,
- numThreads: Int,
- requestHandlerAvgIdleMetricName: String,
- logAndThreadNamePrefix : String) extends Logging {
+class KafkaRequestHandlerPool(
+ val brokerId: Int,
+ val requestChannel: RequestChannel,
+ val apis: ApiRequestHandler,
+ time: Time,
+ numThreads: Int,
+ requestHandlerAvgIdleMetricName: String,
+ logAndThreadNamePrefix : String,
+ nodeName: String = "broker"
+) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
@@ -204,7 +210,7 @@
}
def createHandler(id: Int): Unit = synchronized {
- runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
+ runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName)
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
new file mode 100644
index 0000000..f6cda44
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.publisher.MetadataPublisher
+
+class KRaftMetadataCachePublisher(
+ val metadataCache: KRaftMetadataCache
+) extends MetadataPublisher {
+ override def name(): String = "KRaftMetadataCachePublisher"
+
+ override def onMetadataUpdate(
+ delta: MetadataDelta,
+ newImage: MetadataImage,
+ manifest: LoaderManifest
+ ): Unit = {
+ metadataCache.setImage(newImage)
+ }
+}
+
diff --git a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java
new file mode 100644
index 0000000..8f17095
--- /dev/null
+++ b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.logger;
+
+import kafka.utils.Log4jController;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RuntimeLoggerManagerTest {
+ private final static Logger LOG = LoggerFactory.getLogger(RuntimeLoggerManagerTest.class);
+
+ private final static RuntimeLoggerManager MANAGER = new RuntimeLoggerManager(5, LOG);
+
+ @Test
+ public void testValidateSetLogLevelConfig() {
+ MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
+ setName(LOG.getName()).
+ setConfigOperation(OpType.SET.id()).
+ setValue("TRACE")));
+ }
+
+ @Test
+ public void testValidateDeleteLogLevelConfig() {
+ MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
+ setName(LOG.getName()).
+ setConfigOperation(OpType.DELETE.id()).
+ setValue("")));
+ }
+
+ @ParameterizedTest
+ @ValueSource(bytes = {(byte) 2, (byte) 3})
+ public void testOperationNotAllowed(byte id) {
+ OpType opType = AlterConfigOp.OpType.forId(id);
+ assertEquals(opType + " operation is not allowed for the BROKER_LOGGER resource",
+ Assertions.assertThrows(InvalidRequestException.class,
+ () -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
+ setName(LOG.getName()).
+ setConfigOperation(id).
+ setValue("TRACE")))).getMessage());
+ }
+
+ @Test
+ public void testValidateBogusLogLevelNameNotAllowed() {
+ assertEquals("Cannot set the log level of " + LOG.getName() + " to BOGUS as it is not " +
+ "a supported log level. Valid log levels are DEBUG, ERROR, FATAL, INFO, TRACE, WARN",
+ Assertions.assertThrows(InvalidConfigurationException.class,
+ () -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
+ setName(LOG.getName()).
+ setConfigOperation(OpType.SET.id()).
+ setValue("BOGUS")))).getMessage());
+ }
+
+ @Test
+ public void testValidateSetRootLogLevelConfig() {
+ MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
+ setName(Log4jController.ROOT_LOGGER()).
+ setConfigOperation(OpType.SET.id()).
+ setValue("TRACE")));
+ }
+
+ @Test
+ public void testValidateRemoveRootLogLevelConfigNotAllowed() {
+ assertEquals("Removing the log level of the " + Log4jController.ROOT_LOGGER() +
+ " logger is not allowed",
+ Assertions.assertThrows(InvalidRequestException.class,
+ () -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
+ setName(Log4jController.ROOT_LOGGER()).
+ setConfigOperation(OpType.DELETE.id()).
+ setValue("")))).getMessage());
+ }
+}
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index f149c82..96ce658 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -89,6 +89,11 @@
String bootstrapServers();
/**
+ * The broker connect string which can be used by clients for bootstrapping to the controller quorum.
+ */
+ String bootstrapControllers();
+
+ /**
* A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
* acting as the controller (since ZK controllers serve both broker and controller roles).
*/
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 82ec566..98f8ab5 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -26,7 +26,6 @@
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.zk.EmbeddedZookeeper;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
@@ -143,7 +142,12 @@
@Override
public String bootstrapServers() {
- return clusterReference.get().clientProperties().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ return clusterReference.get().bootstrapServers();
+ }
+
+ @Override
+ public String bootstrapControllers() {
+ return clusterReference.get().bootstrapControllers();
}
@Override
@@ -237,7 +241,8 @@
@Override
public Admin createAdminClient(Properties configOverrides) {
- Admin admin = Admin.create(clusterReference.get().clientProperties(configOverrides));
+ Admin admin = Admin.create(clusterReference.get().
+ newClientPropertiesBuilder(configOverrides).build());
admins.add(admin);
return admin;
}
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index c196ccc..5eb342d 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -199,6 +199,11 @@
}
@Override
+ public String bootstrapControllers() {
+ throw new RuntimeException("Cannot use --bootstrap-controller with ZK-based clusters.");
+ }
+
+ @Override
public Collection<SocketServer> brokerSocketServers() {
return servers()
.map(KafkaServer::socketServer)
diff --git a/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java b/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java
new file mode 100644
index 0000000..2eef6d3
--- /dev/null
+++ b/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.server;
+
+import kafka.server.ControllerServer;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.DescribeFeaturesResult;
+import org.apache.kafka.clients.admin.DescribeMetadataQuorumResult;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.FinalizedVersionRange;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
+import org.apache.kafka.clients.admin.UpdateFeaturesResult;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidUpdateVersionException;
+import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
+import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
+import org.apache.kafka.controller.QuorumController;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
+import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG;
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(120)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class BootstrapControllersIntegrationTest {
+ private KafkaClusterTestKit cluster;
+
+ private String bootstrapControllerString;
+
+ @BeforeAll
+ public void createCluster() throws Exception {
+ this.cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(3).
+ setNumControllerNodes(3).build()).build();
+ this.cluster.format();
+ this.cluster.startup();
+ this.cluster.waitForActiveController();
+ this.cluster.waitForReadyBrokers();
+ StringBuilder bootstrapControllerStringBuilder = new StringBuilder();
+ String prefix = "";
+ for (ControllerServer controller : cluster.controllers().values()) {
+ bootstrapControllerStringBuilder.append(prefix);
+ prefix = ",";
+ int port = controller.socketServerFirstBoundPortFuture().get(1, TimeUnit.MINUTES);
+ bootstrapControllerStringBuilder.append("localhost:").append(port);
+ }
+ bootstrapControllerString = bootstrapControllerStringBuilder.toString();
+ }
+
+ @AfterAll
+ public void destroyCluster() throws Exception {
+ cluster.close();
+ }
+
+ private Properties adminProperties(boolean usingBootstrapControllers) {
+ Properties properties = cluster.clientProperties();
+ if (usingBootstrapControllers) {
+ properties.remove(BOOTSTRAP_SERVERS_CONFIG);
+ properties.setProperty(BOOTSTRAP_CONTROLLERS_CONFIG, bootstrapControllerString);
+ }
+ return properties;
+ }
+
+ @Test
+ public void testPutBrokersInBootstrapControllersConfig() throws Exception {
+ Properties properties = cluster.clientProperties();
+ properties.put(BOOTSTRAP_CONTROLLERS_CONFIG, properties.getProperty(BOOTSTRAP_SERVERS_CONFIG));
+ properties.remove(BOOTSTRAP_SERVERS_CONFIG);
+ try (Admin admin = Admin.create(properties)) {
+ ExecutionException exception = assertThrows(ExecutionException.class,
+ () -> admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES));
+ assertNotNull(exception.getCause());
+ assertEquals(MismatchedEndpointTypeException.class, exception.getCause().getClass());
+ assertEquals("The request was sent to an endpoint of type BROKER, but we wanted " +
+ "an endpoint of type CONTROLLER", exception.getCause().getMessage());
+ }
+ }
+
+ @Disabled
+ @Test
+ public void testPutControllersInBootstrapBrokersConfig() throws Exception {
+ Properties properties = cluster.clientProperties();
+ properties.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapControllerString);
+ try (Admin admin = Admin.create(properties)) {
+ ExecutionException exception = assertThrows(ExecutionException.class,
+ () -> admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES));
+ assertNotNull(exception.getCause());
+ assertEquals(MismatchedEndpointTypeException.class, exception.getCause().getClass());
+ assertEquals("This endpoint does not appear to be a BROKER.",
+ exception.getCause().getMessage());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testDescribeCluster(boolean usingBootstrapControllers) throws Exception {
+ try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
+ DescribeClusterResult result = admin.describeCluster();
+ assertEquals(cluster.controllers().values().iterator().next().clusterId(),
+ result.clusterId().get(1, TimeUnit.MINUTES));
+ if (usingBootstrapControllers) {
+ assertEquals(((QuorumController) cluster.waitForActiveController()).nodeId(),
+ result.controller().get().id());
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testDescribeFeatures(boolean usingBootstrapControllers) throws Exception {
+ try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
+ DescribeFeaturesResult result = admin.describeFeatures();
+ short metadataVersion = cluster.controllers().values().iterator().next().
+ featuresPublisher().features().metadataVersion().featureLevel();
+ assertEquals(new FinalizedVersionRange(metadataVersion, metadataVersion),
+ result.featureMetadata().get(1, TimeUnit.MINUTES).finalizedFeatures().
+ get(MetadataVersion.FEATURE_NAME));
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testUpdateFeatures(boolean usingBootstrapControllers) throws Exception {
+ try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
+ UpdateFeaturesResult result = admin.updateFeatures(Collections.singletonMap("foo.bar.feature",
+ new FeatureUpdate((short) 1, FeatureUpdate.UpgradeType.UPGRADE)),
+ new UpdateFeaturesOptions());
+ ExecutionException exception =
+ assertThrows(ExecutionException.class,
+ () -> result.all().get(1, TimeUnit.MINUTES));
+ assertNotNull(exception.getCause());
+ assertEquals(InvalidUpdateVersionException.class, exception.getCause().getClass());
+ assertTrue(exception.getCause().getMessage().endsWith("does not support this feature."),
+ "expected message to end with 'does not support this feature', but it was: " +
+ exception.getCause().getMessage());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testDescribeMetadataQuorum(boolean usingBootstrapControllers) throws Exception {
+ try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
+ DescribeMetadataQuorumResult result = admin.describeMetadataQuorum();
+ assertEquals(((QuorumController) cluster.waitForActiveController()).nodeId(),
+ result.quorumInfo().get(1, TimeUnit.MINUTES).leaderId());
+ }
+ }
+
+ @Test
+ public void testUsingBootstrapControllersOnUnsupportedAdminApi() throws Exception {
+ try (Admin admin = Admin.create(adminProperties(true))) {
+ ListOffsetsResult result = admin.listOffsets(Collections.singletonMap(
+ new TopicPartition("foo", 0), OffsetSpec.earliest()));
+ ExecutionException exception =
+ assertThrows(ExecutionException.class,
+ () -> result.all().get(1, TimeUnit.MINUTES));
+ assertNotNull(exception.getCause());
+ assertEquals(UnsupportedEndpointTypeException.class, exception.getCause().getClass());
+ assertEquals("This Admin API is not yet supported when communicating directly with " +
+ "the controller quorum.", exception.getCause().getMessage());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testIncrementalAlterConfigs(boolean usingBootstrapControllers) throws Exception {
+ try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
+ int nodeId = usingBootstrapControllers ?
+ cluster.controllers().values().iterator().next().config().nodeId() :
+ cluster.brokers().values().iterator().next().config().nodeId();
+ ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
+ ConfigResource defaultResource = new ConfigResource(BROKER, "");
+ Map<ConfigResource, Collection<AlterConfigOp>> alterations = new HashMap<>();
+ alterations.put(nodeResource, Arrays.asList(
+ new AlterConfigOp(new ConfigEntry("my.custom.config", "foo"),
+ AlterConfigOp.OpType.SET)));
+ alterations.put(defaultResource, Arrays.asList(
+ new AlterConfigOp(new ConfigEntry("my.custom.config", "bar"),
+ AlterConfigOp.OpType.SET)));
+ admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES);
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ Config config = admin.describeConfigs(Arrays.asList(nodeResource)).
+ all().get(1, TimeUnit.MINUTES).get(nodeResource);
+ ConfigEntry entry = config.entries().stream().
+ filter(e -> e.name().equals("my.custom.config")).
+ findFirst().get();
+ assertEquals(DYNAMIC_BROKER_CONFIG, entry.source(),
+ "Expected entry for my.custom.config to come from DYNAMIC_BROKER_CONFIG. " +
+ "Instead, the entry was: " + entry);
+ });
+ }
+ }
+}
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 1adcb70..14b711c 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -29,6 +29,7 @@
import kafka.tools.StorageTool;
import kafka.utils.Logging;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
@@ -446,51 +447,96 @@
"Failed to wait for publisher to publish the metadata update to each broker.");
}
- public Properties controllerClientProperties() throws ExecutionException, InterruptedException {
- Properties properties = new Properties();
- if (!controllers.isEmpty()) {
- Collection<Node> controllerNodes = RaftConfig.voterConnectionsToNodes(
- controllerQuorumVotersFutureManager.future.get());
-
- StringBuilder bld = new StringBuilder();
- String prefix = "";
- for (Node node : controllerNodes) {
- bld.append(prefix).append(node.id()).append('@');
- bld.append(node.host()).append(":").append(node.port());
- prefix = ",";
- }
- properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, bld.toString());
- properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
- controllerNodes.stream().map(n -> n.host() + ":" + n.port()).
- collect(Collectors.joining(",")));
+ public String quorumVotersConfig() throws ExecutionException, InterruptedException {
+ Collection<Node> controllerNodes = RaftConfig.voterConnectionsToNodes(
+ controllerQuorumVotersFutureManager.future.get());
+ StringBuilder bld = new StringBuilder();
+ String prefix = "";
+ for (Node node : controllerNodes) {
+ bld.append(prefix).append(node.id()).append('@');
+ bld.append(node.host()).append(":").append(node.port());
+ prefix = ",";
}
- return properties;
+ return bld.toString();
+ }
+
+ public class ClientPropertiesBuilder {
+ private Properties properties;
+ private boolean usingBootstrapControllers = false;
+
+ public ClientPropertiesBuilder() {
+ this.properties = new Properties();
+ }
+
+ public ClientPropertiesBuilder(Properties properties) {
+ this.properties = properties;
+ }
+
+ public ClientPropertiesBuilder setUsingBootstrapControllers(boolean usingBootstrapControllers) {
+ this.usingBootstrapControllers = usingBootstrapControllers;
+ return this;
+ }
+
+ public Properties build() {
+ if (usingBootstrapControllers) {
+ properties.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, bootstrapControllers());
+ properties.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ } else {
+ properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+ properties.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
+ }
+ return properties;
+ }
+ }
+
+ public ClientPropertiesBuilder newClientPropertiesBuilder(Properties properties) {
+ return new ClientPropertiesBuilder(properties);
+ }
+
+ public ClientPropertiesBuilder newClientPropertiesBuilder() {
+ return new ClientPropertiesBuilder();
}
public Properties clientProperties() {
- return clientProperties(new Properties());
+ return new ClientPropertiesBuilder().build();
}
- public Properties clientProperties(Properties configOverrides) {
- if (!brokers.isEmpty()) {
- StringBuilder bld = new StringBuilder();
- String prefix = "";
- for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
- int brokerId = entry.getKey();
- BrokerServer broker = entry.getValue();
- ListenerName listenerName = nodes.externalListenerName();
- int port = broker.boundPort(listenerName);
- if (port <= 0) {
- throw new RuntimeException("Broker " + brokerId + " does not yet " +
+ public String bootstrapServers() {
+ StringBuilder bld = new StringBuilder();
+ String prefix = "";
+ for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
+ int brokerId = entry.getKey();
+ BrokerServer broker = entry.getValue();
+ ListenerName listenerName = nodes.externalListenerName();
+ int port = broker.boundPort(listenerName);
+ if (port <= 0) {
+ throw new RuntimeException("Broker " + brokerId + " does not yet " +
+ "have a bound port for " + listenerName + ". Did you start " +
+ "the cluster yet?");
+ }
+ bld.append(prefix).append("localhost:").append(port);
+ prefix = ",";
+ }
+ return bld.toString();
+ }
+
+ public String bootstrapControllers() {
+ StringBuilder bld = new StringBuilder();
+ String prefix = "";
+ for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
+ int id = entry.getKey();
+ ControllerServer controller = entry.getValue();
+ ListenerName listenerName = nodes.controllerListenerName();
+ int port = controller.socketServer().boundPort(listenerName);
+ if (port <= 0) {
+ throw new RuntimeException("Controller " + id + " does not yet " +
"have a bound port for " + listenerName + ". Did you start " +
"the cluster yet?");
- }
- bld.append(prefix).append("localhost:").append(port);
- prefix = ",";
}
- configOverrides.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bld.toString());
+ bld.append(prefix).append("localhost:").append(port);
+ prefix = ",";
}
- return configOverrides;
+ return bld.toString();
}
public Map<Integer, ControllerServer> controllers() {
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java
index 5bc3685..a27dfc0 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -185,6 +185,10 @@
return new ListenerName("EXTERNAL");
}
+ public ListenerName controllerListenerName() {
+ return new ListenerName("CONTROLLER");
+ }
+
public TestKitNodes copyWithAbsolutePaths(String baseDirectory) {
NavigableMap<Integer, ControllerNode> newControllerNodes = new TreeMap<>();
NavigableMap<Integer, BrokerNode> newBrokerNodes = new TreeMap<>();
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 32036ca..836c1b1 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -26,7 +26,7 @@
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource.Type
-import org.apache.kafka.common.errors.PolicyViolationException
+import org.apache.kafka.common.errors.{PolicyViolationException, UnsupportedVersionException}
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
@@ -52,12 +52,11 @@
import java.io.File
import java.nio.file.{FileSystems, Path}
import java.{lang, util}
-import java.util.concurrent.{CompletableFuture, CompletionStage}
+import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Arrays, Collections, Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.mutable
-import scala.concurrent.ExecutionException
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@@ -1188,6 +1187,33 @@
cluster.close()
}
}
+
+ @Test
+ def testDirectToControllerCommunicationFailsOnOlderMetadataVersion(): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2).
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).build()).
+ build()
+ try {
+ cluster.format()
+ cluster.startup()
+ val admin = Admin.create(cluster.newClientPropertiesBuilder().
+ setUsingBootstrapControllers(true).
+ build())
+ try {
+ val exception = assertThrows(classOf[ExecutionException],
+ () => admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES))
+ assertNotNull(exception.getCause)
+ assertEquals(classOf[UnsupportedVersionException], exception.getCause.getClass)
+ } finally {
+ admin.close()
+ }
+ } finally {
+ cluster.close()
+ }
+ }
}
class BadAuthorizer() extends Authorizer {
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index dd9b578..d4a1e8f 100644
--- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -69,10 +69,8 @@
val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
// Enable migration configs and restart brokers
- val props = kraftCluster.controllerClientProperties()
- val voters = props.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
- zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+ zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
@@ -108,10 +106,8 @@
kraftCluster.startup()
// Enable migration configs and restart brokers
- val props = kraftCluster.controllerClientProperties()
- val voters = props.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
- zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+ zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
assertThrows(classOf[IllegalArgumentException], () => zkCluster.rollingBrokerRestart())
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 9e1026a..040fa52 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -172,10 +172,8 @@
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
- val clientProps = kraftCluster.controllerClientProperties()
- val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
- zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+ zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig());
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed
@@ -297,10 +295,8 @@
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
- val clientProps = kraftCluster.controllerClientProperties()
- val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
- zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+ zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
@@ -367,10 +363,8 @@
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
- val clientProps = kraftCluster.controllerClientProperties()
- val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
- zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+ zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
@@ -434,10 +428,8 @@
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
- val clientProps = kraftCluster.controllerClientProperties()
- val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
- zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+ zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
@@ -496,10 +488,8 @@
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
- val clientProps = kraftCluster.controllerClientProperties()
- val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
- zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+ zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 36c8c95..124227d 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -44,6 +44,10 @@
private val zkConnect = "localhost:2181"
private val dummyAdminZkClient = new DummyAdminZkClient(null)
+ private val zookeeperBootstrap = Array("--zookeeper", zkConnect)
+ private val brokerBootstrap = Array("--bootstrap-server", "localhost:9092")
+ private val controllerBootstrap = Array("--bootstrap-controller", "localhost:9093")
+
@Test
def shouldExitWithNonZeroStatusOnArgError(): Unit = {
assertNonZeroStatusExit(Array("--blah"))
@@ -51,32 +55,28 @@
@Test
def shouldExitWithNonZeroStatusOnZkCommandWithTopicsEntity(): Unit = {
- assertNonZeroStatusExit(Array(
- "--zookeeper", zkConnect,
+ assertNonZeroStatusExit(zookeeperBootstrap ++ Array(
"--entity-type", "topics",
"--describe"))
}
@Test
def shouldExitWithNonZeroStatusOnZkCommandWithClientsEntity(): Unit = {
- assertNonZeroStatusExit(Array(
- "--zookeeper", zkConnect,
+ assertNonZeroStatusExit(zookeeperBootstrap ++ Array(
"--entity-type", "clients",
"--describe"))
}
@Test
def shouldExitWithNonZeroStatusOnZkCommandWithIpsEntity(): Unit = {
- assertNonZeroStatusExit(Array(
- "--zookeeper", zkConnect,
+ assertNonZeroStatusExit(zookeeperBootstrap ++ Array(
"--entity-type", "ips",
"--describe"))
}
@Test
def shouldExitWithNonZeroStatusAlterUserQuotaWithoutEntityName(): Unit = {
- assertNonZeroStatusExit(Array(
- "--bootstrap-server", "localhost:9092",
+ assertNonZeroStatusExit(brokerBootstrap ++ Array(
"--entity-type", "users",
"--alter", "--add-config", "consumer_byte_rate=20000"))
}
@@ -91,6 +91,12 @@
}
@Test
+ def shouldExitWithNonZeroStatusIfBothBootstrapServerAndBootstrapControllerGiven(): Unit = {
+ assertNonZeroStatusExit(brokerBootstrap ++ controllerBootstrap ++ Array(
+ "--describe", "--broker-defaults" ))
+ }
+
+ @Test
def shouldExitWithNonZeroStatusOnBrokerCommandWithZkTlsConfigFile(): Unit = {
assertNonZeroStatusExit(Array(
"--bootstrap-server", "invalid host",
@@ -119,66 +125,96 @@
@Test
def shouldFailParseArgumentsForClientsEntityTypeUsingZookeeper(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () => testArgumentParse("clients", zkConfig = true))
+ assertThrows(classOf[IllegalArgumentException], () => testArgumentParse(zookeeperBootstrap, "clients"))
}
@Test
- def shouldParseArgumentsForClientsEntityType(): Unit = {
- testArgumentParse("clients", zkConfig = false)
+ def shouldParseArgumentsForClientsEntityTypeWithBrokerBootstrap(): Unit = {
+ testArgumentParse(brokerBootstrap, "clients")
+ }
+
+ @Test
+ def shouldParseArgumentsForClientsEntityTypeWithControllerBootstrap(): Unit = {
+ testArgumentParse(controllerBootstrap, "clients")
}
@Test
def shouldParseArgumentsForUsersEntityTypeUsingZookeeper(): Unit = {
- testArgumentParse("users", zkConfig = true)
+ testArgumentParse(zookeeperBootstrap, "users")
}
@Test
- def shouldParseArgumentsForUsersEntityType(): Unit = {
- testArgumentParse("users", zkConfig = false)
+ def shouldParseArgumentsForUsersEntityTypeWithBrokerBootstrap(): Unit = {
+ testArgumentParse(brokerBootstrap, "users")
+ }
+
+ @Test
+ def shouldParseArgumentsForUsersEntityTypeWithControllerBootstrap(): Unit = {
+ testArgumentParse(controllerBootstrap, "users")
}
@Test
def shouldFailParseArgumentsForTopicsEntityTypeUsingZookeeper(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () => testArgumentParse("topics", zkConfig = true))
+ assertThrows(classOf[IllegalArgumentException], () => testArgumentParse(zookeeperBootstrap, "topics"))
}
@Test
- def shouldParseArgumentsForTopicsEntityType(): Unit = {
- testArgumentParse("topics", zkConfig = false)
+ def shouldParseArgumentsForTopicsEntityTypeWithBrokerBootstrap(): Unit = {
+ testArgumentParse(brokerBootstrap, "topics")
+ }
+
+ @Test
+ def shouldParseArgumentsForTopicsEntityTypeWithControllerBootstrap(): Unit = {
+ testArgumentParse(controllerBootstrap, "topics")
}
@Test
def shouldParseArgumentsForBrokersEntityTypeUsingZookeeper(): Unit = {
- testArgumentParse("brokers", zkConfig = true)
+ testArgumentParse(zookeeperBootstrap, "brokers")
}
@Test
- def shouldParseArgumentsForBrokersEntityType(): Unit = {
- testArgumentParse("brokers", zkConfig = false)
+ def shouldParseArgumentsForBrokersEntityTypeWithBrokerBootstrap(): Unit = {
+ testArgumentParse(brokerBootstrap, "brokers")
}
@Test
- def shouldParseArgumentsForBrokerLoggersEntityType(): Unit = {
- testArgumentParse("broker-loggers", zkConfig = false)
+ def shouldParseArgumentsForBrokersEntityTypeWithControllerBootstrap(): Unit = {
+ testArgumentParse(controllerBootstrap, "brokers")
+ }
+
+ @Test
+ def shouldParseArgumentsForBrokerLoggersEntityTypeWithBrokerBootstrap(): Unit = {
+ testArgumentParse(brokerBootstrap, "broker-loggers")
+ }
+
+ @Test
+ def shouldParseArgumentsForBrokerLoggersEntityTypeWithControllerBootstrap(): Unit = {
+ testArgumentParse(controllerBootstrap, "broker-loggers")
}
@Test
def shouldFailParseArgumentsForIpEntityTypeUsingZookeeper(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () => testArgumentParse("ips", zkConfig = true))
+ assertThrows(classOf[IllegalArgumentException], () => testArgumentParse(zookeeperBootstrap, "ips"))
}
@Test
- def shouldParseArgumentsForIpEntityType(): Unit = {
- testArgumentParse("ips", zkConfig = false)
+ def shouldParseArgumentsForIpEntityTypeWithBrokerBootstrap(): Unit = {
+ testArgumentParse(brokerBootstrap, "ips")
}
- def testArgumentParse(entityType: String, zkConfig: Boolean): Unit = {
+ @Test
+ def shouldParseArgumentsForIpEntityTypeWithControllerBootstrap(): Unit = {
+ testArgumentParse(controllerBootstrap, "ips")
+ }
+
+ def testArgumentParse(
+ bootstrapArguments: Array[String],
+ entityType: String
+ ): Unit = {
val shortFlag: String = s"--${entityType.dropRight(1)}"
- val connectOpts = if (zkConfig)
- ("--zookeeper", zkConnect)
- else
- ("--bootstrap-server", "localhost:9092")
+ val connectOpts = (bootstrapArguments(0), bootstrapArguments(1))
// Should parse correctly
var createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
diff --git a/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
index 80884da..d91e146 100644
--- a/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
@@ -220,7 +220,8 @@
assertEquals(new DescribeClusterResponseData().
setClusterId("ltCWoi9wRhmHSQCIgAznEg").
setControllerId(-1).
- setClusterAuthorizedOperations(Int.MinValue), responseData)
+ setClusterAuthorizedOperations(Int.MinValue).
+ setEndpointType(2.toByte), responseData)
}
@Test
@@ -240,6 +241,7 @@
setClusterId("ltCWoi9wRhmHSQCIgAznEg").
setControllerId(1).
setClusterAuthorizedOperations(Int.MinValue).
- setBrokers(nodes), responseData)
+ setBrokers(nodes).
+ setEndpointType(2.toByte), responseData)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
index 19390c3..c3cd079 100644
--- a/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
@@ -21,11 +21,10 @@
import java.util.Collections
import kafka.server.metadata.MockConfigRepository
-import kafka.utils.{Log4jController, TestUtils}
+import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC, UNKNOWN}
-import org.apache.kafka.common.config.LogLevelConfig.VALID_LOG_LEVELS
-import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
+import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.message.{AlterConfigsRequestData, AlterConfigsResponseData, IncrementalAlterConfigsRequestData, IncrementalAlterConfigsResponseData}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => LAlterConfigsResourceCollection}
@@ -44,8 +43,6 @@
import org.junit.jupiter.api.{Assertions, Test}
import org.slf4j.LoggerFactory
-import scala.jdk.CollectionConverters._
-
class ConfigAdminManagerTest {
val logger = LoggerFactory.getLogger(classOf[ConfigAdminManagerTest])
@@ -259,47 +256,6 @@
() => manager.validateResourceNameIsCurrentNodeId("e")).getMessage())
}
- @Test
- def testValidateLogLevelConfigs(): Unit = {
- val manager = newConfigAdminManager(5)
- manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
- setName(logger.getName).
- setConfigOperation(OpType.SET.id()).
- setValue("TRACE")))
- manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
- setName(logger.getName).
- setConfigOperation(OpType.DELETE.id()).
- setValue("")))
- assertEquals("APPEND operation is not allowed for the BROKER_LOGGER resource",
- Assertions.assertThrows(classOf[InvalidRequestException],
- () => manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
- setName(logger.getName).
- setConfigOperation(OpType.APPEND.id()).
- setValue("TRACE")))).getMessage())
- assertEquals(s"Cannot set the log level of ${logger.getName} to BOGUS as it is not " +
- s"a supported log level. Valid log levels are ${VALID_LOG_LEVELS.asScala.mkString(", ")}",
- Assertions.assertThrows(classOf[InvalidConfigurationException],
- () => manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
- setName(logger.getName).
- setConfigOperation(OpType.SET.id()).
- setValue("BOGUS")))).getMessage())
- }
-
- @Test
- def testValidateRootLogLevelConfigs(): Unit = {
- val manager = newConfigAdminManager(5)
- manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
- setName(Log4jController.ROOT_LOGGER).
- setConfigOperation(OpType.SET.id()).
- setValue("TRACE")))
- assertEquals(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed",
- Assertions.assertThrows(classOf[InvalidRequestException],
- () => manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
- setName(Log4jController.ROOT_LOGGER).
- setConfigOperation(OpType.DELETE.id()).
- setValue("")))).getMessage())
- }
-
def brokerLogger1Incremental(): IAlterConfigsResource = new IAlterConfigsResource().
setResourceName("1").
setResourceType(BROKER_LOGGER.id).
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 58eba87..200a898 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -20,6 +20,7 @@
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.metadata.KRaftMetadataCache
import kafka.test.MockController
import kafka.utils.NotNothing
import org.apache.kafka.clients.admin.AlterConfigOp
@@ -60,6 +61,7 @@
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.slf4j.LoggerFactory
import java.net.InetAddress
import java.util
@@ -72,6 +74,8 @@
import scala.reflect.ClassTag
class ControllerApisTest {
+ val logger = LoggerFactory.getLogger(classOf[ControllerApisTest])
+
object MockControllerMutationQuota {
val errorMessage = "quota exceeded in test"
var throttleTimeMs = 1000
@@ -117,6 +121,7 @@
)
private val replicaQuotaManager: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
private val raftManager: RaftManager[ApiMessageAndVersion] = mock(classOf[RaftManager[ApiMessageAndVersion]])
+ private val metadataCache: KRaftMetadataCache = MetadataCache.kRaftMetadataCache(0)
private val quotasNeverThrottleControllerMutations = QuotaManagers(
clientQuotaManager,
@@ -160,7 +165,8 @@
ListenerType.CONTROLLER,
true,
false,
- () => Features.fromKRaftVersion(MetadataVersion.latest()))
+ () => Features.fromKRaftVersion(MetadataVersion.latest())),
+ metadataCache
)
}
@@ -487,16 +493,17 @@
response.data().responses().asScala.toSet)
}
- @Test
- def testInvalidIncrementalAlterConfigsResources(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(false, true))
+ def testInvalidIncrementalAlterConfigsResources(denyAllAuthorizer: Boolean): Unit = {
val requestData = new IncrementalAlterConfigsRequestData().setResources(
new AlterConfigsResourceCollection(util.Arrays.asList(
new AlterConfigsResource().
setResourceName("1").
setResourceType(ConfigResource.Type.BROKER_LOGGER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
- setName("kafka.server.KafkaConfig").
- setValue("TRACE").
+ setName("kafka.server.ControllerApisTest").
+ setValue("DEBUG").
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
new AlterConfigsResource().
setResourceName("3").
@@ -521,7 +528,12 @@
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
).iterator()))
val request = buildRequest(new IncrementalAlterConfigsRequest.Builder(requestData).build(0))
- createControllerApis(Some(createDenyAllAuthorizer()),
+ val authorizer = if (denyAllAuthorizer) {
+ Some(createDenyAllAuthorizer())
+ } else {
+ None
+ }
+ createControllerApis(authorizer,
new MockController.Builder().build()).handleIncrementalAlterConfigs(request)
val capturedResponse: ArgumentCaptor[AbstractResponse] =
ArgumentCaptor.forClass(classOf[AbstractResponse])
@@ -533,8 +545,8 @@
val response = capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
assertEquals(Set(
new AlterConfigsResourceResponse().
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("Unexpected resource type BROKER_LOGGER.").
+ setErrorCode(if (denyAllAuthorizer) CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
+ setErrorMessage(if (denyAllAuthorizer) CLUSTER_AUTHORIZATION_FAILED.message() else null).
setResourceName("1").
setResourceType(ConfigResource.Type.BROKER_LOGGER.id()),
new AlterConfigsResourceResponse().
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
index 0f90412..419f0e4 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
@@ -19,6 +19,8 @@
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
@@ -194,4 +196,53 @@
}
}
}
+
+ static class InitializeBootstrapException extends RuntimeException {
+ private final static long serialVersionUID = 1L;
+
+ InitializeBootstrapException(String message) {
+ super(message);
+ }
+ }
+
+ public static void initializeBootstrapProperties(
+ Properties properties,
+ Optional<String> bootstrapServer,
+ Optional<String> bootstrapControllers
+ ) {
+ if (bootstrapServer.isPresent()) {
+ if (bootstrapControllers.isPresent()) {
+ throw new InitializeBootstrapException("You cannot specify both " +
+ "--bootstrap-controller and --bootstrap-server.");
+ }
+ properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ bootstrapServer.get());
+ properties.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
+ } else if (bootstrapControllers.isPresent()) {
+ properties.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ properties.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
+ bootstrapControllers.get());
+ } else {
+ throw new InitializeBootstrapException("You must specify either --bootstrap-controller " +
+ "or --bootstrap-server.");
+ }
+ }
+
+ public static void initializeBootstrapProperties(
+ OptionParser parser,
+ OptionSet options,
+ Properties properties,
+ OptionSpec<String> bootstrapServer,
+ OptionSpec<String> bootstrapControllers
+ ) {
+ try {
+ initializeBootstrapProperties(properties,
+ options.has(bootstrapServer) ?
+ Optional.of(options.valueOf(bootstrapServer).toString()) : Optional.empty(),
+ options.has(bootstrapControllers) ?
+ Optional.of(options.valueOf(bootstrapControllers).toString()) : Optional.empty());
+ } catch (InitializeBootstrapException e) {
+ printUsageAndExit(parser, e.getMessage());
+ }
+ }
}
diff --git a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
index 4d12122..e52f39c 100644
--- a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
@@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -224,4 +225,45 @@
assertEquals("existing-string-3", props.get("sondkey"));
assertEquals("500", props.get("iondkey"));
}
+
+ static private Properties createTestProps() {
+ Properties props = new Properties();
+ props.setProperty("bootstrap.servers", "this");
+ props.setProperty("bootstrap.controllers", "that");
+ return props;
+ }
+
+ @Test
+ public void testInitializeBootstrapPropertiesWithNoBootstraps() {
+ assertEquals("You must specify either --bootstrap-controller or --bootstrap-server.",
+ assertThrows(CommandLineUtils.InitializeBootstrapException.class,
+ () -> CommandLineUtils.initializeBootstrapProperties(createTestProps(),
+ Optional.empty(), Optional.empty())).getMessage());
+ }
+
+ @Test
+ public void testInitializeBootstrapPropertiesWithBrokerBootstrap() {
+ Properties props = createTestProps();
+ CommandLineUtils.initializeBootstrapProperties(props,
+ Optional.of("127.0.0.2:9094"), Optional.empty());
+ assertEquals("127.0.0.2:9094", props.getProperty("bootstrap.servers"));
+ assertNull(props.getProperty("bootstrap.controllers"));
+ }
+
+ @Test
+ public void testInitializeBootstrapPropertiesWithControllerBootstrap() {
+ Properties props = createTestProps();
+ CommandLineUtils.initializeBootstrapProperties(props,
+ Optional.empty(), Optional.of("127.0.0.2:9094"));
+ assertNull(props.getProperty("bootstrap.servers"));
+ assertEquals("127.0.0.2:9094", props.getProperty("bootstrap.controllers"));
+ }
+
+ @Test
+ public void testInitializeBootstrapPropertiesWithBothBootstraps() {
+ assertEquals("You cannot specify both --bootstrap-controller and --bootstrap-server.",
+ assertThrows(CommandLineUtils.InitializeBootstrapException.class,
+ () -> CommandLineUtils.initializeBootstrapProperties(createTestProps(),
+ Optional.of("127.0.0.2:9094"), Optional.of("127.0.0.3:9095"))).getMessage());
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
index ef5cb1c..28ec83c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
@@ -18,6 +18,7 @@
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
@@ -25,9 +26,11 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
import java.io.PrintStream;
import java.util.Arrays;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@@ -65,9 +68,13 @@
Subparser unregisterParser = subparsers.addParser("unregister")
.help("Unregister a broker.");
for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser)) {
- subpparser.addArgument("--bootstrap-server", "-b")
+ MutuallyExclusiveGroup connectionOptions = subpparser.addMutuallyExclusiveGroup().required(true);
+ connectionOptions.addArgument("--bootstrap-server", "-b")
.action(store())
.help("A list of host/port pairs to use for establishing the connection to the Kafka cluster.");
+ connectionOptions.addArgument("--bootstrap-controller", "-C")
+ .action(store())
+ .help("A list of host/port pairs to use for establishing the connection to the KRaft controllers.");
subpparser.addArgument("--config", "-c")
.action(store())
.help("A property file containing configurations for the Admin client.");
@@ -83,13 +90,9 @@
String configPath = namespace.getString("config");
Properties properties = (configPath == null) ? new Properties() : Utils.loadProps(configPath);
- String bootstrapServer = namespace.getString("bootstrap_server");
- if (bootstrapServer != null) {
- properties.setProperty("bootstrap.servers", bootstrapServer);
- }
- if (properties.getProperty("bootstrap.servers") == null) {
- throw new TerseException("Please specify --bootstrap-server.");
- }
+ CommandLineUtils.initializeBootstrapProperties(properties,
+ Optional.ofNullable(namespace.getString("bootstrap_server")),
+ Optional.ofNullable(namespace.getString("bootstrap_controller")));
switch (command) {
case "cluster-id": {
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index ab6e813..67e774a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -30,6 +30,7 @@
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
@@ -43,6 +44,7 @@
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.CommandLineUtils;
import static net.sourceforge.argparse4j.impl.Arguments.append;
import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -73,13 +75,12 @@
.newArgumentParser("kafka-features")
.defaultHelp(true)
.description("This tool manages feature flags in Kafka.");
- parser
- .addArgument("--bootstrap-server")
- .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
- .required(true);
-
- parser
- .addArgument("--command-config")
+ MutuallyExclusiveGroup bootstrapGroup = parser.addMutuallyExclusiveGroup().required(true);
+ bootstrapGroup.addArgument("--bootstrap-server")
+ .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.");
+ bootstrapGroup.addArgument("--bootstrap-controller")
+ .help("A comma-separated list of host:port pairs to use for establishing the connection to the KRaft quorum.");
+ parser.addArgument("--command-config")
.type(Arguments.fileType())
.help("Property file containing configs to be passed to Admin Client.");
Subparsers subparsers = parser.addSubparsers().dest("command");
@@ -93,13 +94,9 @@
String configPath = namespace.getString("command_config");
Properties properties = (configPath == null) ? new Properties() : Utils.loadProps(configPath);
- String bootstrapServer = namespace.getString("bootstrap_server");
- if (bootstrapServer != null) {
- properties.setProperty("bootstrap.servers", bootstrapServer);
- }
- if (properties.getProperty("bootstrap.servers") == null) {
- throw new TerseException("Please specify --bootstrap-server.");
- }
+ CommandLineUtils.initializeBootstrapProperties(properties,
+ Optional.ofNullable(namespace.getString("bootstrap_server")),
+ Optional.ofNullable(namespace.getString("bootstrap_controller")));
try (Admin adminClient = Admin.create(properties)) {
switch (command) {
diff --git a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
index 4f47ead..618d69e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
@@ -20,15 +20,16 @@
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
import java.io.File;
import java.io.IOException;
@@ -75,12 +76,12 @@
.newArgumentParser("kafka-metadata-quorum")
.defaultHelp(true)
.description("This tool describes kraft metadata quorum status.");
- parser
- .addArgument("--bootstrap-server")
- .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
- .required(true);
- parser
- .addArgument("--command-config")
+ MutuallyExclusiveGroup connectionOptions = parser.addMutuallyExclusiveGroup().required(true);
+ connectionOptions.addArgument("--bootstrap-server")
+ .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.");
+ connectionOptions.addArgument("--bootstrap-controller")
+ .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka controllers.");
+ parser.addArgument("--command-config")
.type(Arguments.fileType())
.help("Property file containing configs to be passed to Admin Client.");
addDescribeSubParser(parser);
@@ -92,7 +93,9 @@
File optionalCommandConfig = namespace.get("command_config");
final Properties props = getProperties(optionalCommandConfig);
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"));
+ CommandLineUtils.initializeBootstrapProperties(props,
+ Optional.ofNullable(namespace.getString("bootstrap_server")),
+ Optional.ofNullable(namespace.getString("bootstrap_controller")));
admin = Admin.create(props);
if (command.equals("describe")) {
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 2391aeb..5193a05 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -71,6 +71,16 @@
"SupportedMaxVersion: 3.7-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
}
+ @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV0)
+ public void testDescribeWithKRaftAndBootstrapControllers() {
+ String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+ assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-controller", cluster.bootstrapControllers(), "describe"))
+ );
+ // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
+ assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
+ "SupportedMaxVersion: 3.7-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(commandOutput));
+ }
+
@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testUpgradeMetadataVersionWithZk() {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->