| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.server |
| |
| import java.net.InetAddress |
| import java.nio.charset.StandardCharsets |
| import java.util |
| import java.util.Arrays.asList |
| import java.util.concurrent.{CompletableFuture, TimeUnit} |
| import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties} |
| import kafka.api.LeaderAndIsr |
| import kafka.cluster.Broker |
| import kafka.controller.{ControllerContext, KafkaController} |
| import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} |
| import kafka.network.RequestChannel |
| import kafka.server.QuotaFactory.QuotaManagers |
| import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache} |
| import kafka.utils.{Log4jController, TestUtils} |
| import kafka.zk.KafkaZkClient |
| import org.apache.kafka.clients.admin.AlterConfigOp.OpType |
| import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} |
| import org.apache.kafka.common.acl.AclOperation |
| import org.apache.kafka.common.config.ConfigResource |
| import org.apache.kafka.common.errors.UnsupportedVersionException |
| import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic} |
| import org.apache.kafka.common.memory.MemoryPool |
| import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER} |
| import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} |
| import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult |
| import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => LAlterConfigsResourceCollection} |
| import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource} |
| import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => LAlterableConfigCollection} |
| import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => LAlterableConfig} |
| import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse} |
| import org.apache.kafka.common.message.ApiMessageType.ListenerType |
| import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} |
| import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult |
| import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource} |
| import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection => IAlterConfigsResourceCollection} |
| import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfig => IAlterableConfig} |
| import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfigCollection => IAlterableConfigCollection} |
| import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse} |
| import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity |
| import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} |
| import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic |
| import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} |
| import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} |
| import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} |
| import org.apache.kafka.common.message._ |
| import org.apache.kafka.common.metrics.Metrics |
| import org.apache.kafka.common.network.{ClientInformation, ListenerName} |
| import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil} |
| import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} |
| import org.apache.kafka.common.record.FileRecords.TimestampAndOffset |
| import org.apache.kafka.common.record._ |
| import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType |
| import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata |
| import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse |
| import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry |
| import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _} |
| import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} |
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} |
| import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource |
| import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils} |
| import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} |
| import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} |
| import org.junit.jupiter.api.Assertions._ |
| import org.junit.jupiter.api.{AfterEach, Test} |
| import org.junit.jupiter.params.ParameterizedTest |
| import org.junit.jupiter.params.provider.{CsvSource, ValueSource} |
| import org.mockito.ArgumentMatchers.{any, anyBoolean, anyDouble, anyInt, anyLong, anyShort, anyString, argThat, isNotNull} |
| import org.mockito.Mockito.{mock, reset, times, verify, when} |
| import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} |
| |
| import scala.collection.{Map, Seq, mutable} |
| import scala.jdk.CollectionConverters._ |
| import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic |
| import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult |
| import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection} |
| import org.apache.kafka.coordinator.group.GroupCoordinator |
| import org.apache.kafka.server.common.MetadataVersion |
| import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} |
| import org.apache.kafka.server.util.MockTime |
| import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData} |
| |
| class KafkaApisTest { |
| private val requestChannel: RequestChannel = mock(classOf[RequestChannel]) |
| private val requestChannelMetrics: RequestChannel.Metrics = mock(classOf[RequestChannel.Metrics]) |
| private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) |
| private val groupCoordinator: GroupCoordinator = mock(classOf[GroupCoordinator]) |
| private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager]) |
| private val txnCoordinator: TransactionCoordinator = mock(classOf[TransactionCoordinator]) |
| private val controller: KafkaController = mock(classOf[KafkaController]) |
| private val forwardingManager: ForwardingManager = mock(classOf[ForwardingManager]) |
| private val autoTopicCreationManager: AutoTopicCreationManager = mock(classOf[AutoTopicCreationManager]) |
| |
| private val kafkaPrincipalSerde = new KafkaPrincipalSerde { |
| override def serialize(principal: KafkaPrincipal): Array[Byte] = Utils.utf8(principal.toString) |
| override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes)) |
| } |
| private val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) |
| private val metrics = new Metrics() |
| private val brokerId = 1 |
| // KRaft tests should override this with a KRaftMetadataCache |
| private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latest()) |
| private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) |
| private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager]) |
| private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager]) |
| private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager]) |
| private val replicaQuotaManager: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager]) |
| private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager, |
| clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None) |
| private val fetchManager: FetchManager = mock(classOf[FetchManager]) |
| private val brokerTopicStats = new BrokerTopicStats |
| private val clusterId = "clusterId" |
| private val time = new MockTime |
| private val clientId = "" |
| |
| @AfterEach |
| def tearDown(): Unit = { |
| quotas.shutdown() |
| TestUtils.clearYammerMetrics() |
| metrics.close() |
| } |
| |
| def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest, |
| authorizer: Option[Authorizer] = None, |
| enableForwarding: Boolean = false, |
| configRepository: ConfigRepository = new MockConfigRepository(), |
| raftSupport: Boolean = false, |
| overrideProperties: Map[String, String] = Map.empty): KafkaApis = { |
| val properties = if (raftSupport) { |
| val properties = TestUtils.createBrokerConfig(brokerId, "") |
| properties.put(KafkaConfig.NodeIdProp, brokerId.toString) |
| properties.put(KafkaConfig.ProcessRolesProp, "broker") |
| val voterId = brokerId + 1 |
| properties.put(KafkaConfig.QuorumVotersProp, s"$voterId@localhost:9093") |
| properties.put(KafkaConfig.ControllerListenerNamesProp, "SSL") |
| properties |
| } else { |
| TestUtils.createBrokerConfig(brokerId, "zk") |
| } |
| overrideProperties.foreach( p => properties.put(p._1, p._2)) |
| TestUtils.setIbpAndMessageFormatVersions(properties, interBrokerProtocolVersion) |
| val config = new KafkaConfig(properties) |
| |
| val forwardingManagerOpt = if (enableForwarding) |
| Some(this.forwardingManager) |
| else |
| None |
| |
| val metadataSupport = if (raftSupport) { |
| // it will be up to the test to replace the default ZkMetadataCache implementation |
| // with a KRaftMetadataCache instance |
| metadataCache match { |
| case cache: KRaftMetadataCache => RaftSupport(forwardingManager, cache) |
| case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache") |
| } |
| } else { |
| metadataCache match { |
| case zkMetadataCache: ZkMetadataCache => |
| ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache, brokerEpochManager) |
| case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache") |
| } |
| } |
| |
| val listenerType = if (raftSupport) ListenerType.BROKER else ListenerType.ZK_BROKER |
| val enabledApis = if (enableForwarding) { |
| ApiKeys.apisForListener(listenerType).asScala ++ Set(ApiKeys.ENVELOPE) |
| } else { |
| ApiKeys.apisForListener(listenerType).asScala.toSet |
| } |
| val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false) |
| |
| new KafkaApis( |
| requestChannel = requestChannel, |
| metadataSupport = metadataSupport, |
| replicaManager = replicaManager, |
| groupCoordinator = groupCoordinator, |
| txnCoordinator = txnCoordinator, |
| autoTopicCreationManager = autoTopicCreationManager, |
| brokerId = brokerId, |
| config = config, |
| configRepository = configRepository, |
| metadataCache = metadataCache, |
| metrics = metrics, |
| authorizer = authorizer, |
| quotas = quotas, |
| fetchManager = fetchManager, |
| brokerTopicStats = brokerTopicStats, |
| clusterId = clusterId, |
| time = time, |
| tokenManager = null, |
| apiVersionManager = apiVersionManager) |
| } |
| |
| @Test |
| def testDescribeConfigsWithAuthorizer(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| val operation = AclOperation.DESCRIBE_CONFIGS |
| val resourceType = ResourceType.TOPIC |
| val resourceName = "topic-1" |
| val requestHeader = new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, ApiKeys.DESCRIBE_CONFIGS.latestVersion, |
| clientId, 0) |
| |
| val expectedActions = Seq( |
| new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL), |
| 1, true, true) |
| ) |
| |
| // Verify that authorize is only called once |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(expectedActions.asJava))) |
| .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) |
| |
| val configRepository: ConfigRepository = mock(classOf[ConfigRepository]) |
| val topicConfigs = new Properties() |
| val propName = "min.insync.replicas" |
| val propValue = "3" |
| topicConfigs.put(propName, propValue) |
| when(configRepository.topicConfig(resourceName)).thenReturn(topicConfigs) |
| |
| metadataCache = mock(classOf[ZkMetadataCache]) |
| when(metadataCache.contains(resourceName)).thenReturn(true) |
| |
| val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() |
| .setIncludeSynonyms(true) |
| .setResources(List(new DescribeConfigsRequestData.DescribeConfigsResource() |
| .setResourceName(resourceName) |
| .setResourceType(ConfigResource.Type.TOPIC.id)).asJava)) |
| .build(requestHeader.apiVersion) |
| val request = buildRequest(describeConfigsRequest, |
| requestHeader = Option(requestHeader)) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository) |
| .handleDescribeConfigsRequest(request) |
| |
| verify(authorizer).authorize(any(), ArgumentMatchers.eq(expectedActions.asJava)) |
| val response = verifyNoThrottling[DescribeConfigsResponse](request) |
| val results = response.data().results() |
| assertEquals(1, results.size()) |
| val describeConfigsResult: DescribeConfigsResult = results.get(0) |
| assertEquals(ConfigResource.Type.TOPIC.id, describeConfigsResult.resourceType()) |
| assertEquals(resourceName, describeConfigsResult.resourceName()) |
| val configs = describeConfigsResult.configs().asScala.filter(_.name() == propName) |
| assertEquals(1, configs.length) |
| val describeConfigsResponseData = configs.head |
| assertEquals(propName, describeConfigsResponseData.name()) |
| assertEquals(propValue, describeConfigsResponseData.value()) |
| } |
| |
| @Test |
| def testEnvelopeRequestHandlingAsController(): Unit = { |
| testEnvelopeRequestWithAlterConfig( |
| alterConfigHandler = () => ApiError.NONE, |
| expectedError = Errors.NONE |
| ) |
| } |
| |
| @Test |
| def testEnvelopeRequestWithAlterConfigUnhandledError(): Unit = { |
| testEnvelopeRequestWithAlterConfig( |
| alterConfigHandler = () => throw new IllegalStateException(), |
| expectedError = Errors.UNKNOWN_SERVER_ERROR |
| ) |
| } |
| |
| private def testEnvelopeRequestWithAlterConfig( |
| alterConfigHandler: () => ApiError, |
| expectedError: Errors |
| ): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED) |
| |
| val operation = AclOperation.ALTER_CONFIGS |
| val resourceName = "topic-1" |
| val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion, |
| clientId, 0) |
| |
| when(controller.isActive).thenReturn(true) |
| |
| authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName, AuthorizationResult.ALLOWED) |
| |
| val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName) |
| when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false))) |
| .thenAnswer(_ => { |
| Map(configResource -> alterConfigHandler.apply()) |
| }) |
| |
| val configs = Map( |
| configResource -> new AlterConfigsRequest.Config( |
| Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava)) |
| val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion) |
| |
| val startTimeNanos = time.nanoseconds() |
| val queueDurationNanos = 5 * 1000 * 1000 |
| val request = TestUtils.buildEnvelopeRequest( |
| alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, startTimeNanos, startTimeNanos + queueDurationNanos) |
| |
| val capturedResponse: ArgumentCaptor[AlterConfigsResponse] = ArgumentCaptor.forClass(classOf[AlterConfigsResponse]) |
| val capturedRequest: ArgumentCaptor[RequestChannel.Request] = ArgumentCaptor.forClass(classOf[RequestChannel.Request]) |
| |
| createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching) |
| |
| verify(requestChannel).sendResponse( |
| capturedRequest.capture(), |
| capturedResponse.capture(), |
| any() |
| ) |
| assertEquals(Some(request), capturedRequest.getValue.envelope) |
| // the dequeue time of forwarded request should equals to envelop request |
| assertEquals(request.requestDequeueTimeNanos, capturedRequest.getValue.requestDequeueTimeNanos) |
| val innerResponse = capturedResponse.getValue |
| val responseMap = innerResponse.data.responses().asScala.map { resourceResponse => |
| resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode) |
| }.toMap |
| |
| assertEquals(Map(resourceName -> expectedError), responseMap) |
| |
| verify(controller).isActive |
| verify(adminManager).alterConfigs(any(), ArgumentMatchers.eq(false)) |
| } |
| |
| @Test |
| def testInvalidEnvelopeRequestWithNonForwardableAPI(): Unit = { |
| val requestHeader = new RequestHeader(ApiKeys.LEAVE_GROUP, ApiKeys.LEAVE_GROUP.latestVersion, |
| clientId, 0) |
| val leaveGroupRequest = new LeaveGroupRequest.Builder("group", |
| Collections.singletonList(new MemberIdentity())).build(requestHeader.apiVersion) |
| |
| when(controller.isActive).thenReturn(true) |
| |
| val request = TestUtils.buildEnvelopeRequest( |
| leaveGroupRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds()) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[EnvelopeResponse](request) |
| assertEquals(Errors.INVALID_REQUEST, response.error()) |
| } |
| |
| @Test |
| def testEnvelopeRequestWithNotFromPrivilegedListener(): Unit = { |
| testInvalidEnvelopeRequest(Errors.NONE, fromPrivilegedListener = false, |
| shouldCloseConnection = true) |
| } |
| |
| @Test |
| def testEnvelopeRequestNotAuthorized(): Unit = { |
| testInvalidEnvelopeRequest(Errors.CLUSTER_AUTHORIZATION_FAILED, |
| performAuthorize = true, authorizeResult = AuthorizationResult.DENIED) |
| } |
| |
| @Test |
| def testEnvelopeRequestNotControllerHandling(): Unit = { |
| testInvalidEnvelopeRequest(Errors.NOT_CONTROLLER, performAuthorize = true, isActiveController = false) |
| } |
| |
| private def testInvalidEnvelopeRequest(expectedError: Errors, |
| fromPrivilegedListener: Boolean = true, |
| shouldCloseConnection: Boolean = false, |
| performAuthorize: Boolean = false, |
| authorizeResult: AuthorizationResult = AuthorizationResult.ALLOWED, |
| isActiveController: Boolean = true): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| if (performAuthorize) { |
| authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER, Resource.CLUSTER_NAME, authorizeResult) |
| } |
| |
| val resourceName = "topic-1" |
| val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion, |
| clientId, 0) |
| |
| when(controller.isActive).thenReturn(isActiveController) |
| |
| val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName) |
| |
| val configs = Map( |
| configResource -> new AlterConfigsRequest.Config( |
| Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava)) |
| val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, false) |
| .build(requestHeader.apiVersion) |
| |
| val request = TestUtils.buildEnvelopeRequest( |
| alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds(), fromPrivilegedListener = fromPrivilegedListener) |
| |
| val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse]) |
| createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching) |
| |
| if (shouldCloseConnection) { |
| verify(requestChannel).closeConnection( |
| ArgumentMatchers.eq(request), |
| ArgumentMatchers.eq(java.util.Collections.emptyMap()) |
| ) |
| } else { |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None)) |
| val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse] |
| assertEquals(expectedError, response.error) |
| } |
| if (performAuthorize) { |
| verify(authorizer).authorize(any(), any()) |
| } |
| } |
| |
| @Test |
| def testAlterConfigsWithAuthorizer(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| val authorizedTopic = "authorized-topic" |
| val unauthorizedTopic = "unauthorized-topic" |
| val (authorizedResource, unauthorizedResource) = |
| createConfigsWithAuthorization(authorizer, authorizedTopic, unauthorizedTopic) |
| |
| val configs = Map( |
| authorizedResource -> new AlterConfigsRequest.Config( |
| Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava), |
| unauthorizedResource -> new AlterConfigsRequest.Config( |
| Seq(new AlterConfigsRequest.ConfigEntry("foo-1", "bar-1")).asJava) |
| ) |
| |
| val topicHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion, |
| clientId, 0) |
| |
| val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, false) |
| .build(topicHeader.apiVersion) |
| val request = buildRequest(alterConfigsRequest) |
| |
| when(controller.isActive).thenReturn(false) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false))) |
| .thenReturn(Map(authorizedResource -> ApiError.NONE)) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleAlterConfigsRequest(request) |
| |
| val response = verifyNoThrottling[AlterConfigsResponse](request) |
| verifyAlterConfigResult(response, Map(authorizedTopic -> Errors.NONE, |
| unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED)) |
| verify(authorizer, times(2)).authorize(any(), any()) |
| verify(adminManager).alterConfigs(any(), anyBoolean()) |
| } |
| |
| @Test |
| def testElectLeadersForwarding(): Unit = { |
| val requestBuilder = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000) |
| testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder) |
| } |
| |
| @Test |
| def testDescribeQuorumNotAllowedForZkClusters(): Unit = { |
| val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition) |
| val requestBuilder = new DescribeQuorumRequest.Builder(requestData) |
| val request = buildRequest(requestBuilder.build()) |
| |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[DescribeQuorumResponse](request) |
| assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) |
| } |
| |
| @Test |
| def testDescribeQuorumForwardedForKRaftClusters(): Unit = { |
| val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition) |
| val requestBuilder = new DescribeQuorumRequest.Builder(requestData) |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| |
| testForwardableApi( |
| createKafkaApis(raftSupport = true), |
| ApiKeys.DESCRIBE_QUORUM, |
| requestBuilder |
| ) |
| } |
| |
| private def testKraftForwarding( |
| apiKey: ApiKeys, |
| requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest] |
| ): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| testForwardableApi( |
| createKafkaApis(enableForwarding = true, raftSupport = true), |
| apiKey, |
| requestBuilder |
| ) |
| } |
| |
| private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = { |
| testForwardableApi( |
| createKafkaApis(enableForwarding = true), |
| apiKey, |
| requestBuilder |
| ) |
| } |
| |
| private def testForwardableApi( |
| kafkaApis: KafkaApis, |
| apiKey: ApiKeys, |
| requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest] |
| ): Unit = { |
| val topicHeader = new RequestHeader(apiKey, apiKey.latestVersion, |
| clientId, 0) |
| |
| val apiRequest = requestBuilder.build(topicHeader.apiVersion) |
| val request = buildRequest(apiRequest) |
| |
| if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) { |
| // The controller check only makes sense for ZK clusters. For KRaft, |
| // controller requests are handled on a separate listener, so there |
| // is no choice but to forward them. |
| when(controller.isActive).thenReturn(false) |
| } |
| |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] = ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit]) |
| |
| kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching) |
| verify(forwardingManager).forwardRequest( |
| ArgumentMatchers.eq(request), |
| forwardCallback.capture() |
| ) |
| assertNotNull(request.buffer, "The buffer was unexpectedly deallocated after " + |
| s"`handle` returned (is $apiKey marked as forwardable in `ApiKeys`?)") |
| |
| val expectedResponse = apiRequest.getErrorResponse(Errors.NOT_CONTROLLER.exception) |
| forwardCallback.getValue.apply(Some(expectedResponse)) |
| |
| val capturedResponse = verifyNoThrottling[AbstractResponse](request) |
| assertEquals(expectedResponse.data, capturedResponse.data) |
| |
| if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) { |
| verify(controller).isActive |
| } |
| } |
| |
| private def authorizeResource(authorizer: Authorizer, |
| operation: AclOperation, |
| resourceType: ResourceType, |
| resourceName: String, |
| result: AuthorizationResult, |
| logIfAllowed: Boolean = true, |
| logIfDenied: Boolean = true): Unit = { |
| val expectedAuthorizedAction = if (operation == AclOperation.CLUSTER_ACTION) |
| new Action(operation, |
| new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), |
| 1, logIfAllowed, logIfDenied) |
| else |
| new Action(operation, |
| new ResourcePattern(resourceType, resourceName, PatternType.LITERAL), |
| 1, logIfAllowed, logIfDenied) |
| |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(Seq(expectedAuthorizedAction).asJava))) |
| .thenReturn(Seq(result).asJava) |
| } |
| |
| private def verifyAlterConfigResult(response: AlterConfigsResponse, |
| expectedResults: Map[String, Errors]): Unit = { |
| val responseMap = response.data.responses().asScala.map { resourceResponse => |
| resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode) |
| }.toMap |
| |
| assertEquals(expectedResults, responseMap) |
| } |
| |
| private def createConfigsWithAuthorization(authorizer: Authorizer, |
| authorizedTopic: String, |
| unauthorizedTopic: String): (ConfigResource, ConfigResource) = { |
| val authorizedResource = new ConfigResource(ConfigResource.Type.TOPIC, authorizedTopic) |
| |
| val unauthorizedResource = new ConfigResource(ConfigResource.Type.TOPIC, unauthorizedTopic) |
| |
| createTopicAuthorization(authorizer, AclOperation.ALTER_CONFIGS, authorizedTopic, unauthorizedTopic) |
| (authorizedResource, unauthorizedResource) |
| } |
| |
| @Test |
| def testIncrementalAlterConfigsWithAuthorizer(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| val authorizedTopic = "authorized-topic" |
| val unauthorizedTopic = "unauthorized-topic" |
| val (authorizedResource, unauthorizedResource) = |
| createConfigsWithAuthorization(authorizer, authorizedTopic, unauthorizedTopic) |
| |
| val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0) |
| |
| val incrementalAlterConfigsRequest = getIncrementalAlterConfigRequestBuilder(Seq(authorizedResource, unauthorizedResource)) |
| .build(requestHeader.apiVersion) |
| val request = buildRequest(incrementalAlterConfigsRequest, |
| fromPrivilegedListener = true, requestHeader = Option(requestHeader)) |
| |
| when(controller.isActive).thenReturn(true) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| when(adminManager.incrementalAlterConfigs(any(), ArgumentMatchers.eq(false))) |
| .thenReturn(Map(authorizedResource -> ApiError.NONE)) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleIncrementalAlterConfigsRequest(request) |
| |
| val capturedResponse = verifyNoThrottling[IncrementalAlterConfigsResponse](request) |
| verifyIncrementalAlterConfigResult(capturedResponse, Map( |
| authorizedTopic -> Errors.NONE, |
| unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED |
| )) |
| |
| verify(authorizer, times(2)).authorize(any(), any()) |
| verify(adminManager).incrementalAlterConfigs(any(), anyBoolean()) |
| } |
| |
| private def getIncrementalAlterConfigRequestBuilder(configResources: Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = { |
| val resourceMap = configResources.map(configResource => { |
| configResource -> Set( |
| new AlterConfigOp(new ConfigEntry("foo", "bar"), |
| OpType.forId(configResource.`type`.id))).asJavaCollection |
| }).toMap.asJava |
| |
| new IncrementalAlterConfigsRequest.Builder(resourceMap, false) |
| } |
| |
| private def verifyIncrementalAlterConfigResult(response: IncrementalAlterConfigsResponse, |
| expectedResults: Map[String, Errors]): Unit = { |
| val responseMap = response.data.responses().asScala.map { resourceResponse => |
| resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode) |
| }.toMap |
| assertEquals(expectedResults, responseMap) |
| } |
| |
| @Test |
| def testAlterClientQuotasWithAuthorizer(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER, |
| Resource.CLUSTER_NAME, AuthorizationResult.DENIED) |
| |
| val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user")) |
| val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection)) |
| |
| val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion, clientId, 0) |
| |
| val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false) |
| .build(requestHeader.apiVersion) |
| val request = buildRequest(alterClientQuotasRequest, |
| fromPrivilegedListener = true, requestHeader = Option(requestHeader)) |
| |
| when(controller.isActive).thenReturn(true) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| anyLong)).thenReturn(0) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request) |
| |
| val capturedResponse = verifyNoThrottling[AlterClientQuotasResponse](request) |
| verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED)) |
| |
| verify(authorizer).authorize(any(), any()) |
| verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong) |
| } |
| |
| @Test |
| def testAlterClientQuotasWithForwarding(): Unit = { |
| val requestBuilder = new AlterClientQuotasRequest.Builder(List.empty.asJava, false) |
| testForwardableApi(ApiKeys.ALTER_CLIENT_QUOTAS, requestBuilder) |
| } |
| |
| private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse, |
| expected: Map[ClientQuotaEntity, Errors]): Unit = { |
| val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap |
| response.complete(futures.asJava) |
| futures.foreach { |
| case (entity, future) => |
| future.whenComplete((_, thrown) => |
| assertEquals(thrown, expected(entity).exception()) |
| ).isDone |
| } |
| } |
| |
| @Test |
| def testCreateTopicsWithAuthorizer(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| val authorizedTopic = "authorized-topic" |
| val unauthorizedTopic = "unauthorized-topic" |
| |
| authorizeResource(authorizer, AclOperation.CREATE, ResourceType.CLUSTER, |
| Resource.CLUSTER_NAME, AuthorizationResult.DENIED, logIfDenied = false) |
| |
| createCombinedTopicAuthorization(authorizer, AclOperation.CREATE, |
| authorizedTopic, unauthorizedTopic) |
| |
| createCombinedTopicAuthorization(authorizer, AclOperation.DESCRIBE_CONFIGS, |
| authorizedTopic, unauthorizedTopic, logIfDenied = false) |
| |
| val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion, clientId, 0) |
| |
| when(controller.isActive).thenReturn(true) |
| |
| val topics = new CreateTopicsRequestData.CreatableTopicCollection(2) |
| val topicToCreate = new CreateTopicsRequestData.CreatableTopic() |
| .setName(authorizedTopic) |
| topics.add(topicToCreate) |
| |
| val topicToFilter = new CreateTopicsRequestData.CreatableTopic() |
| .setName(unauthorizedTopic) |
| topics.add(topicToFilter) |
| |
| val timeout = 10 |
| val createTopicsRequest = new CreateTopicsRequest.Builder( |
| new CreateTopicsRequestData() |
| .setTimeoutMs(timeout) |
| .setValidateOnly(false) |
| .setTopics(topics)) |
| .build(requestHeader.apiVersion) |
| val request = buildRequest(createTopicsRequest, |
| fromPrivilegedListener = true, requestHeader = Option(requestHeader)) |
| |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| when(clientControllerQuotaManager.newQuotaFor( |
| ArgumentMatchers.eq(request), ArgumentMatchers.eq(6))).thenReturn(UnboundedControllerMutationQuota) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleCreateTopicsRequest(request) |
| |
| val capturedCallback: ArgumentCaptor[Map[String, ApiError] => Unit] = ArgumentCaptor.forClass(classOf[Map[String, ApiError] => Unit]) |
| |
| verify(adminManager).createTopics( |
| ArgumentMatchers.eq(timeout), |
| ArgumentMatchers.eq(false), |
| ArgumentMatchers.eq(Map(authorizedTopic -> topicToCreate)), |
| any(), |
| ArgumentMatchers.eq(UnboundedControllerMutationQuota), |
| capturedCallback.capture()) |
| capturedCallback.getValue.apply(Map(authorizedTopic -> ApiError.NONE)) |
| |
| val capturedResponse = verifyNoThrottling[CreateTopicsResponse](request) |
| verifyCreateTopicsResult(capturedResponse, Map(authorizedTopic -> Errors.NONE, |
| unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED)) |
| } |
| |
| @Test |
| def testCreateTopicsWithForwarding(): Unit = { |
| val requestBuilder = new CreateTopicsRequest.Builder( |
| new CreateTopicsRequestData().setTopics( |
| new CreatableTopicCollection(Collections.singleton( |
| new CreatableTopic().setName("topic").setNumPartitions(1). |
| setReplicationFactor(1.toShort)).iterator()))) |
| testForwardableApi(ApiKeys.CREATE_TOPICS, requestBuilder) |
| } |
| |
| @ParameterizedTest |
| @CsvSource(value = Array("0,1500", "1500,0", "3000,1000")) |
| def testKRaftControllerThrottleTimeEnforced( |
| controllerThrottleTimeMs: Int, |
| requestThrottleTimeMs: Int |
| ): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| |
| val topicToCreate = new CreatableTopic() |
| .setName("topic") |
| .setNumPartitions(1) |
| .setReplicationFactor(1.toShort) |
| |
| val requestData = new CreateTopicsRequestData() |
| requestData.topics().add(topicToCreate) |
| |
| val requestBuilder = new CreateTopicsRequest.Builder(requestData).build() |
| val request = buildRequest(requestBuilder) |
| |
| val kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true) |
| val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] = |
| ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit]) |
| |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(request, time.milliseconds())) |
| .thenReturn(requestThrottleTimeMs) |
| |
| kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching) |
| |
| verify(forwardingManager).forwardRequest( |
| ArgumentMatchers.eq(request), |
| forwardCallback.capture() |
| ) |
| |
| val responseData = new CreateTopicsResponseData() |
| .setThrottleTimeMs(controllerThrottleTimeMs) |
| responseData.topics().add(new CreatableTopicResult() |
| .setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code)) |
| |
| forwardCallback.getValue.apply(Some(new CreateTopicsResponse(responseData))) |
| |
| val expectedThrottleTimeMs = math.max(controllerThrottleTimeMs, requestThrottleTimeMs) |
| |
| verify(clientRequestQuotaManager).throttle( |
| ArgumentMatchers.eq(request), |
| any[ThrottleCallback](), |
| ArgumentMatchers.eq(expectedThrottleTimeMs) |
| ) |
| |
| assertEquals(expectedThrottleTimeMs, responseData.throttleTimeMs) |
| } |
| |
| @Test |
| def testCreatePartitionsAuthorization(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| val kafkaApis = createKafkaApis(authorizer = Some(authorizer)) |
| |
| val timeoutMs = 35000 |
| val requestData = new CreatePartitionsRequestData() |
| .setTimeoutMs(timeoutMs) |
| .setValidateOnly(false) |
| val fooCreatePartitionsData = new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2) |
| val barCreatePartitionsData = new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10) |
| requestData.topics().add(fooCreatePartitionsData) |
| requestData.topics().add(barCreatePartitionsData) |
| |
| val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL) |
| val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true) |
| |
| val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.LITERAL) |
| val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true) |
| |
| when(authorizer.authorize( |
| any[RequestContext](), |
| any[util.List[Action]]() |
| )).thenAnswer { invocation => |
| val actions = invocation.getArgument[util.List[Action]](1).asScala |
| val results = actions.map { action => |
| if (action == fooAction) AuthorizationResult.ALLOWED |
| else if (action == barAction) AuthorizationResult.DENIED |
| else throw new AssertionError(s"Unexpected action $action") |
| } |
| new util.ArrayList[AuthorizationResult](results.asJava) |
| } |
| |
| val request = buildRequest(new CreatePartitionsRequest.Builder(requestData).build()) |
| |
| when(controller.isActive).thenReturn(true) |
| when(controller.isTopicQueuedForDeletion("foo")).thenReturn(false) |
| when(clientControllerQuotaManager.newQuotaFor( |
| ArgumentMatchers.eq(request), ArgumentMatchers.anyShort()) |
| ).thenReturn(UnboundedControllerMutationQuota) |
| when(adminManager.createPartitions( |
| timeoutMs = ArgumentMatchers.eq(timeoutMs), |
| newPartitions = ArgumentMatchers.eq(Seq(fooCreatePartitionsData)), |
| validateOnly = ArgumentMatchers.eq(false), |
| controllerMutationQuota = ArgumentMatchers.eq(UnboundedControllerMutationQuota), |
| callback = ArgumentMatchers.any[Map[String, ApiError] => Unit]() |
| )).thenAnswer { invocation => |
| val callback = invocation.getArgument[Map[String, ApiError] => Unit](4) |
| callback.apply(Map("foo" -> ApiError.NONE)) |
| } |
| |
| kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[CreatePartitionsResponse](request) |
| val results = response.data.results.asScala |
| assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result => Errors.forCode(result.errorCode))) |
| assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode))) |
| } |
| |
| private def createTopicAuthorization(authorizer: Authorizer, |
| operation: AclOperation, |
| authorizedTopic: String, |
| unauthorizedTopic: String, |
| logIfAllowed: Boolean = true, |
| logIfDenied: Boolean = true): Unit = { |
| authorizeResource(authorizer, operation, ResourceType.TOPIC, |
| authorizedTopic, AuthorizationResult.ALLOWED, logIfAllowed, logIfDenied) |
| authorizeResource(authorizer, operation, ResourceType.TOPIC, |
| unauthorizedTopic, AuthorizationResult.DENIED, logIfAllowed, logIfDenied) |
| } |
| |
| private def createCombinedTopicAuthorization(authorizer: Authorizer, |
| operation: AclOperation, |
| authorizedTopic: String, |
| unauthorizedTopic: String, |
| logIfAllowed: Boolean = true, |
| logIfDenied: Boolean = true): Unit = { |
| val expectedAuthorizedActions = Seq( |
| new Action(operation, |
| new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), |
| 1, logIfAllowed, logIfDenied), |
| new Action(operation, |
| new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), |
| 1, logIfAllowed, logIfDenied)) |
| |
| when(authorizer.authorize( |
| any[RequestContext], argThat((t: java.util.List[Action]) => t != null && t.containsAll(expectedAuthorizedActions.asJava)) |
| )).thenAnswer { invocation => |
| val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]] |
| actions.asScala.map { action => |
| if (action.resourcePattern().name().equals(authorizedTopic)) |
| AuthorizationResult.ALLOWED |
| else |
| AuthorizationResult.DENIED |
| }.asJava |
| } |
| } |
| |
| private def verifyCreateTopicsResult(response: CreateTopicsResponse, |
| expectedResults: Map[String, Errors]): Unit = { |
| val responseMap = response.data.topics().asScala.map { topicResponse => |
| topicResponse.name() -> Errors.forCode(topicResponse.errorCode) |
| }.toMap |
| |
| assertEquals(expectedResults, responseMap) |
| } |
| |
| @Test |
| def testCreateAclWithForwarding(): Unit = { |
| val requestBuilder = new CreateAclsRequest.Builder(new CreateAclsRequestData()) |
| testForwardableApi(ApiKeys.CREATE_ACLS, requestBuilder) |
| } |
| |
| @Test |
| def testDeleteAclWithForwarding(): Unit = { |
| val requestBuilder = new DeleteAclsRequest.Builder(new DeleteAclsRequestData()) |
| testForwardableApi(ApiKeys.DELETE_ACLS, requestBuilder) |
| } |
| |
| @Test |
| def testCreateDelegationTokenWithForwarding(): Unit = { |
| val requestBuilder = new CreateDelegationTokenRequest.Builder(new CreateDelegationTokenRequestData()) |
| testForwardableApi(ApiKeys.CREATE_DELEGATION_TOKEN, requestBuilder) |
| } |
| |
| @Test |
| def testRenewDelegationTokenWithForwarding(): Unit = { |
| val requestBuilder = new RenewDelegationTokenRequest.Builder(new RenewDelegationTokenRequestData()) |
| testForwardableApi(ApiKeys.RENEW_DELEGATION_TOKEN, requestBuilder) |
| } |
| |
| @Test |
| def testExpireDelegationTokenWithForwarding(): Unit = { |
| val requestBuilder = new ExpireDelegationTokenRequest.Builder(new ExpireDelegationTokenRequestData()) |
| testForwardableApi(ApiKeys.EXPIRE_DELEGATION_TOKEN, requestBuilder) |
| } |
| |
| @Test |
| def testAlterPartitionReassignmentsWithForwarding(): Unit = { |
| val requestBuilder = new AlterPartitionReassignmentsRequest.Builder(new AlterPartitionReassignmentsRequestData()) |
| testForwardableApi(ApiKeys.ALTER_PARTITION_REASSIGNMENTS, requestBuilder) |
| } |
| |
| @Test |
| def testCreatePartitionsWithForwarding(): Unit = { |
| val requestBuilder = new CreatePartitionsRequest.Builder(new CreatePartitionsRequestData()) |
| testForwardableApi(ApiKeys.CREATE_PARTITIONS, requestBuilder) |
| } |
| |
| @Test |
| def testUpdateFeaturesWithForwarding(): Unit = { |
| val requestBuilder = new UpdateFeaturesRequest.Builder(new UpdateFeaturesRequestData()) |
| testForwardableApi(ApiKeys.UPDATE_FEATURES, requestBuilder) |
| } |
| |
| @Test |
| def testDeleteTopicsWithForwarding(): Unit = { |
| val requestBuilder = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()) |
| testForwardableApi(ApiKeys.DELETE_TOPICS, requestBuilder) |
| } |
| |
| @Test |
| def testAlterScramWithForwarding(): Unit = { |
| val requestBuilder = new AlterUserScramCredentialsRequest.Builder(new AlterUserScramCredentialsRequestData()) |
| testForwardableApi(ApiKeys.ALTER_USER_SCRAM_CREDENTIALS, requestBuilder) |
| } |
| |
| @Test |
| def testFindCoordinatorAutoTopicCreationForOffsetTopic(): Unit = { |
| testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP) |
| } |
| |
| @Test |
| def testFindCoordinatorAutoTopicCreationForTxnTopic(): Unit = { |
| testFindCoordinatorWithTopicCreation(CoordinatorType.TRANSACTION) |
| } |
| |
| @Test |
| def testFindCoordinatorNotEnoughBrokersForOffsetTopic(): Unit = { |
| testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP, hasEnoughLiveBrokers = false) |
| } |
| |
| @Test |
| def testFindCoordinatorNotEnoughBrokersForTxnTopic(): Unit = { |
| testFindCoordinatorWithTopicCreation(CoordinatorType.TRANSACTION, hasEnoughLiveBrokers = false) |
| } |
| |
| @Test |
| def testOldFindCoordinatorAutoTopicCreationForOffsetTopic(): Unit = { |
| testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP, version = 3) |
| } |
| |
| @Test |
| def testOldFindCoordinatorAutoTopicCreationForTxnTopic(): Unit = { |
| testFindCoordinatorWithTopicCreation(CoordinatorType.TRANSACTION, version = 3) |
| } |
| |
| @Test |
| def testOldFindCoordinatorNotEnoughBrokersForOffsetTopic(): Unit = { |
| testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP, hasEnoughLiveBrokers = false, version = 3) |
| } |
| |
| @Test |
| def testOldFindCoordinatorNotEnoughBrokersForTxnTopic(): Unit = { |
| testFindCoordinatorWithTopicCreation(CoordinatorType.TRANSACTION, hasEnoughLiveBrokers = false, version = 3) |
| } |
| |
| private def testFindCoordinatorWithTopicCreation(coordinatorType: CoordinatorType, |
| hasEnoughLiveBrokers: Boolean = true, |
| version: Short = ApiKeys.FIND_COORDINATOR.latestVersion): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| val requestHeader = new RequestHeader(ApiKeys.FIND_COORDINATOR, version, clientId, 0) |
| |
| val numBrokersNeeded = 3 |
| |
| setupBrokerMetadata(hasEnoughLiveBrokers, numBrokersNeeded) |
| |
| val requestTimeout = 10 |
| val topicConfigOverride = mutable.Map.empty[String, String] |
| topicConfigOverride.put(KafkaConfig.RequestTimeoutMsProp, requestTimeout.toString) |
| |
| val groupId = "group" |
| val topicName = |
| coordinatorType match { |
| case CoordinatorType.GROUP => |
| topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, numBrokersNeeded.toString) |
| topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, numBrokersNeeded.toString) |
| when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties) |
| authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.GROUP, |
| groupId, AuthorizationResult.ALLOWED) |
| Topic.GROUP_METADATA_TOPIC_NAME |
| case CoordinatorType.TRANSACTION => |
| topicConfigOverride.put(KafkaConfig.TransactionsTopicPartitionsProp, numBrokersNeeded.toString) |
| topicConfigOverride.put(KafkaConfig.TransactionsTopicReplicationFactorProp, numBrokersNeeded.toString) |
| when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties) |
| authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TRANSACTIONAL_ID, |
| groupId, AuthorizationResult.ALLOWED) |
| Topic.TRANSACTION_STATE_TOPIC_NAME |
| case _ => |
| throw new IllegalStateException(s"Unknown coordinator type $coordinatorType") |
| } |
| |
| val findCoordinatorRequestBuilder = if (version >= 4) { |
| new FindCoordinatorRequest.Builder( |
| new FindCoordinatorRequestData() |
| .setKeyType(coordinatorType.id()) |
| .setCoordinatorKeys(asList(groupId))) |
| } else { |
| new FindCoordinatorRequest.Builder( |
| new FindCoordinatorRequestData() |
| .setKeyType(coordinatorType.id()) |
| .setKey(groupId)) |
| } |
| val request = buildRequest(findCoordinatorRequestBuilder.build(requestHeader.apiVersion)) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| val capturedRequest = verifyTopicCreation(topicName, true, true, request) |
| |
| createKafkaApis(authorizer = Some(authorizer), |
| overrideProperties = topicConfigOverride).handleFindCoordinatorRequest(request) |
| |
| val response = verifyNoThrottling[FindCoordinatorResponse](request) |
| if (version >= 4) { |
| assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.coordinators.get(0).errorCode) |
| assertEquals(groupId, response.data.coordinators.get(0).key) |
| } else { |
| assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.errorCode) |
| } |
| assertTrue(capturedRequest.getValue.isEmpty) |
| } |
| |
| @Test |
| def testMetadataAutoTopicCreationForOffsetTopic(): Unit = { |
| testMetadataAutoTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME, enableAutoTopicCreation = true, |
| expectedError = Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| |
| @Test |
| def testMetadataAutoTopicCreationForTxnTopic(): Unit = { |
| testMetadataAutoTopicCreation(Topic.TRANSACTION_STATE_TOPIC_NAME, enableAutoTopicCreation = true, |
| expectedError = Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| |
| @Test |
| def testMetadataAutoTopicCreationForNonInternalTopic(): Unit = { |
| testMetadataAutoTopicCreation("topic", enableAutoTopicCreation = true, |
| expectedError = Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| |
| @Test |
| def testMetadataAutoTopicCreationDisabledForOffsetTopic(): Unit = { |
| testMetadataAutoTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME, enableAutoTopicCreation = false, |
| expectedError = Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| |
| @Test |
| def testMetadataAutoTopicCreationDisabledForTxnTopic(): Unit = { |
| testMetadataAutoTopicCreation(Topic.TRANSACTION_STATE_TOPIC_NAME, enableAutoTopicCreation = false, |
| expectedError = Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| |
| @Test |
| def testMetadataAutoTopicCreationDisabledForNonInternalTopic(): Unit = { |
| testMetadataAutoTopicCreation("topic", enableAutoTopicCreation = false, |
| expectedError = Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| |
| @Test |
| def testMetadataAutoCreationDisabledForNonInternal(): Unit = { |
| testMetadataAutoTopicCreation("topic", enableAutoTopicCreation = true, |
| expectedError = Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| |
| private def testMetadataAutoTopicCreation(topicName: String, |
| enableAutoTopicCreation: Boolean, |
| expectedError: Errors): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion, |
| clientId, 0) |
| |
| val numBrokersNeeded = 3 |
| addTopicToMetadataCache("some-topic", 1, 3) |
| |
| authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TOPIC, |
| topicName, AuthorizationResult.ALLOWED) |
| |
| if (enableAutoTopicCreation) |
| authorizeResource(authorizer, AclOperation.CREATE, ResourceType.CLUSTER, |
| Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED, logIfDenied = false) |
| |
| val topicConfigOverride = mutable.Map.empty[String, String] |
| val isInternal = |
| topicName match { |
| case Topic.GROUP_METADATA_TOPIC_NAME => |
| topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, numBrokersNeeded.toString) |
| topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, numBrokersNeeded.toString) |
| when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties) |
| true |
| |
| case Topic.TRANSACTION_STATE_TOPIC_NAME => |
| topicConfigOverride.put(KafkaConfig.TransactionsTopicPartitionsProp, numBrokersNeeded.toString) |
| topicConfigOverride.put(KafkaConfig.TransactionsTopicReplicationFactorProp, numBrokersNeeded.toString) |
| when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties) |
| true |
| case _ => |
| topicConfigOverride.put(KafkaConfig.NumPartitionsProp, numBrokersNeeded.toString) |
| topicConfigOverride.put(KafkaConfig.DefaultReplicationFactorProp, numBrokersNeeded.toString) |
| false |
| } |
| |
| val metadataRequest = new MetadataRequest.Builder( |
| List(topicName).asJava, enableAutoTopicCreation |
| ).build(requestHeader.apiVersion) |
| val request = buildRequest(metadataRequest) |
| |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| val capturedRequest = verifyTopicCreation(topicName, enableAutoTopicCreation, isInternal, request) |
| |
| createKafkaApis(authorizer = Some(authorizer), enableForwarding = enableAutoTopicCreation, |
| overrideProperties = topicConfigOverride).handleTopicMetadataRequest(request) |
| |
| val response = verifyNoThrottling[MetadataResponse](request) |
| val expectedMetadataResponse = util.Collections.singletonList(new TopicMetadata( |
| expectedError, |
| topicName, |
| isInternal, |
| util.Collections.emptyList() |
| )) |
| |
| assertEquals(expectedMetadataResponse, response.topicMetadata()) |
| |
| if (enableAutoTopicCreation) { |
| assertTrue(capturedRequest.getValue.isDefined) |
| assertEquals(request.context, capturedRequest.getValue.get) |
| } |
| } |
| |
| private def verifyTopicCreation(topicName: String, |
| enableAutoTopicCreation: Boolean, |
| isInternal: Boolean, |
| request: RequestChannel.Request): ArgumentCaptor[Option[RequestContext]] = { |
| val capturedRequest: ArgumentCaptor[Option[RequestContext]] = ArgumentCaptor.forClass(classOf[Option[RequestContext]]) |
| if (enableAutoTopicCreation) { |
| when(clientControllerQuotaManager.newPermissiveQuotaFor(ArgumentMatchers.eq(request))) |
| .thenReturn(UnboundedControllerMutationQuota) |
| |
| when(autoTopicCreationManager.createTopics( |
| ArgumentMatchers.eq(Set(topicName)), |
| ArgumentMatchers.eq(UnboundedControllerMutationQuota), |
| capturedRequest.capture())).thenReturn( |
| Seq(new MetadataResponseTopic() |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) |
| .setIsInternal(isInternal) |
| .setName(topicName)) |
| ) |
| } |
| capturedRequest |
| } |
| |
| private def setupBrokerMetadata(hasEnoughLiveBrokers: Boolean, numBrokersNeeded: Int): Unit = { |
| addTopicToMetadataCache("some-topic", 1, |
| if (hasEnoughLiveBrokers) |
| numBrokersNeeded |
| else |
| numBrokersNeeded - 1) |
| } |
| |
| @Test |
| def testInvalidMetadataRequestReturnsError(): Unit = { |
| // Construct invalid MetadataRequestTopics. We will try each one separately and ensure the error is thrown. |
| val topics = List(new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()), |
| new MetadataRequestData.MetadataRequestTopic().setName(null), |
| new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()), |
| new MetadataRequestData.MetadataRequestTopic().setName("topic1").setTopicId(Uuid.randomUuid())) |
| |
| // if version is 10 or 11, the invalid topic metadata should return an error |
| val invalidVersions = Set(10, 11) |
| invalidVersions.foreach( version => |
| topics.foreach(topic => { |
| val metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)) |
| val request = buildRequest(new MetadataRequest(metadataRequestData, version.toShort)) |
| val kafkaApis = createKafkaApis() |
| |
| val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse]) |
| kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching) |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| any() |
| ) |
| val response = capturedResponse.getValue.asInstanceOf[MetadataResponse] |
| assertEquals(1, response.topicMetadata.size) |
| assertEquals(1, response.errorCounts.get(Errors.INVALID_REQUEST)) |
| response.data.topics.forEach(topic => assertNotEquals(null, topic.name)) |
| reset(requestChannel) |
| }) |
| ) |
| } |
| |
| @Test |
| def testHandleOffsetCommitRequest(): Unit = { |
| addTopicToMetadataCache("foo", numPartitions = 1) |
| |
| val offsetCommitRequest = new OffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setTopics(List( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(10)).asJava)).asJava) |
| |
| val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) |
| |
| val future = new CompletableFuture[OffsetCommitResponseData]() |
| when(groupCoordinator.commitOffsets( |
| requestChannelRequest.context, |
| offsetCommitRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handle( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| // This is the response returned by the group coordinator. |
| val offsetCommitResponse = new OffsetCommitResponseData() |
| .setTopics(List( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code)).asJava)).asJava) |
| |
| future.complete(offsetCommitResponse) |
| val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) |
| assertEquals(offsetCommitResponse, response.data) |
| } |
| |
| @Test |
| def testHandleOffsetCommitRequestFutureFailed(): Unit = { |
| addTopicToMetadataCache("foo", numPartitions = 1) |
| |
| val offsetCommitRequest = new OffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setTopics(List( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(10)).asJava)).asJava) |
| |
| val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) |
| |
| val future = new CompletableFuture[OffsetCommitResponseData]() |
| when(groupCoordinator.commitOffsets( |
| requestChannelRequest.context, |
| offsetCommitRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handle( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val expectedOffsetCommitResponse = new OffsetCommitResponseData() |
| .setTopics(List( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava) |
| |
| future.completeExceptionally(Errors.NOT_COORDINATOR.exception) |
| val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) |
| assertEquals(expectedOffsetCommitResponse, response.data) |
| } |
| |
| @Test |
| def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { |
| addTopicToMetadataCache("foo", numPartitions = 2) |
| addTopicToMetadataCache("bar", numPartitions = 2) |
| |
| val offsetCommitRequest = new OffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setTopics(List( |
| // foo exists but only has 2 partitions. |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(10), |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(20), |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(2) |
| .setCommittedOffset(30)).asJava), |
| // bar exists. |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(40), |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(50)).asJava), |
| // zar does not exist. |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("zar") |
| .setPartitions(List( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(60), |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(70)).asJava)).asJava) |
| |
| val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) |
| |
| // This is the request expected by the group coordinator. |
| val expectedOffsetCommitRequest = new OffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setTopics(List( |
| // foo exists but only has 2 partitions. |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(10), |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(20)).asJava), |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(40), |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(50)).asJava)).asJava) |
| |
| val future = new CompletableFuture[OffsetCommitResponseData]() |
| when(groupCoordinator.commitOffsets( |
| requestChannelRequest.context, |
| expectedOffsetCommitRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handle( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| // This is the response returned by the group coordinator. |
| val offsetCommitResponse = new OffsetCommitResponseData() |
| .setTopics(List( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code)).asJava), |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code)).asJava)).asJava) |
| |
| val expectedOffsetCommitResponse = new OffsetCommitResponseData() |
| .setTopics(List( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| // foo-2 is first because partitions failing the validation |
| // are put in the response first. |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(2) |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code)).asJava), |
| // zar is before bar because topics failing the validation are |
| // put in the response first. |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("zar") |
| .setPartitions(List( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)).asJava), |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code)).asJava)).asJava) |
| |
| future.complete(offsetCommitResponse) |
| val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) |
| assertEquals(expectedOffsetCommitResponse, response.data) |
| } |
| |
| @Test |
| def testOffsetCommitWithInvalidPartition(): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 1) |
| |
| def checkInvalidPartition(invalidPartitionId: Int): Unit = { |
| reset(replicaManager, clientRequestQuotaManager, requestChannel) |
| |
| val offsetCommitRequest = new OffsetCommitRequest.Builder( |
| new OffsetCommitRequestData() |
| .setGroupId("groupId") |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName(topic) |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(invalidPartitionId) |
| .setCommittedOffset(15) |
| .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) |
| .setCommittedMetadata("")) |
| ) |
| ))).build() |
| |
| val request = buildRequest(offsetCommitRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| createKafkaApis().handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[OffsetCommitResponse](request) |
| assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, |
| Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode)) |
| } |
| |
| checkInvalidPartition(-1) |
| checkInvalidPartition(1) // topic has only one partition |
| } |
| |
| @Test |
| def testTxnOffsetCommitWithInvalidPartition(): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 1) |
| |
| def checkInvalidPartition(invalidPartitionId: Int): Unit = { |
| reset(replicaManager, clientRequestQuotaManager, requestChannel) |
| |
| val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId) |
| val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty()) |
| val offsetCommitRequest = new TxnOffsetCommitRequest.Builder( |
| "txnId", |
| "groupId", |
| 15L, |
| 0.toShort, |
| Map(invalidTopicPartition -> partitionOffsetCommitData).asJava, |
| ).build() |
| val request = buildRequest(offsetCommitRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| createKafkaApis().handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[TxnOffsetCommitResponse](request) |
| assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition)) |
| } |
| |
| checkInvalidPartition(-1) |
| checkInvalidPartition(1) // topic has only one partition |
| } |
| |
| @Test |
| def testHandleTxnOffsetCommitRequest(): Unit = { |
| addTopicToMetadataCache("foo", numPartitions = 1) |
| |
| val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setGenerationId(10) |
| .setProducerId(20) |
| .setProducerEpoch(30) |
| .setGroupInstanceId("instance-id") |
| .setTransactionalId("transactional-id") |
| .setTopics(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(10)).asJava)).asJava) |
| |
| val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) |
| |
| val future = new CompletableFuture[TxnOffsetCommitResponseData]() |
| when(groupCoordinator.commitTransactionalOffsets( |
| requestChannelRequest.context, |
| txnOffsetCommitRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handle( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| // This is the response returned by the group coordinator. |
| val txnOffsetCommitResponse = new TxnOffsetCommitResponseData() |
| .setTopics(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code)).asJava)).asJava) |
| |
| future.complete(txnOffsetCommitResponse) |
| val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) |
| assertEquals(txnOffsetCommitResponse, response.data) |
| } |
| |
| @Test |
| def testHandleTxnOffsetCommitRequestFutureFailed(): Unit = { |
| addTopicToMetadataCache("foo", numPartitions = 1) |
| |
| val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setTopics(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(10)).asJava)).asJava) |
| |
| val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) |
| |
| val future = new CompletableFuture[TxnOffsetCommitResponseData]() |
| when(groupCoordinator.commitTransactionalOffsets( |
| requestChannelRequest.context, |
| txnOffsetCommitRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handle( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData() |
| .setTopics(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava) |
| |
| future.completeExceptionally(Errors.NOT_COORDINATOR.exception) |
| val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) |
| assertEquals(expectedTxnOffsetCommitResponse, response.data) |
| } |
| |
| @Test |
| def testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { |
| addTopicToMetadataCache("foo", numPartitions = 2) |
| addTopicToMetadataCache("bar", numPartitions = 2) |
| |
| val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setTopics(List( |
| // foo exists but only has 2 partitions. |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(10), |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(20), |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(2) |
| .setCommittedOffset(30)).asJava), |
| // bar exists. |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(40), |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(50)).asJava), |
| // zar does not exist. |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("zar") |
| .setPartitions(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(60), |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(70)).asJava)).asJava) |
| |
| val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) |
| |
| // This is the request expected by the group coordinator. |
| val expectedTnxOffsetCommitRequest = new TxnOffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setTopics(List( |
| // foo exists but only has 2 partitions. |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(10), |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(20)).asJava), |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(40), |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(50)).asJava)).asJava) |
| |
| val future = new CompletableFuture[TxnOffsetCommitResponseData]() |
| when(groupCoordinator.commitTransactionalOffsets( |
| requestChannelRequest.context, |
| expectedTnxOffsetCommitRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handle( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| // This is the response returned by the group coordinator. |
| val txnOffsetCommitResponse = new TxnOffsetCommitResponseData() |
| .setTopics(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code)).asJava), |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code)).asJava)).asJava) |
| |
| val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData() |
| .setTopics(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| // foo-2 is first because partitions failing the validation |
| // are put in the response first. |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(2) |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code)).asJava), |
| // zar is before bar because topics failing the validation are |
| // put in the response first. |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("zar") |
| .setPartitions(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)).asJava), |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code)).asJava)).asJava) |
| |
| future.complete(txnOffsetCommitResponse) |
| val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) |
| assertEquals(expectedTxnOffsetCommitResponse, response.data) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT) |
| def shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient(version: Short): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 2) |
| |
| val topicPartition = new TopicPartition(topic, 1) |
| val capturedResponse: ArgumentCaptor[TxnOffsetCommitResponse] = ArgumentCaptor.forClass(classOf[TxnOffsetCommitResponse]) |
| |
| val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty()) |
| val groupId = "groupId" |
| |
| val producerId = 15L |
| val epoch = 0.toShort |
| |
| val offsetCommitRequest = new TxnOffsetCommitRequest.Builder( |
| "txnId", |
| groupId, |
| producerId, |
| epoch, |
| Map(topicPartition -> partitionOffsetCommitData).asJava, |
| ).build(version) |
| val request = buildRequest(offsetCommitRequest) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| val future = new CompletableFuture[TxnOffsetCommitResponseData]() |
| when(groupCoordinator.commitTransactionalOffsets( |
| request.context, |
| offsetCommitRequest.data, |
| requestLocal.bufferSupplier |
| )).thenReturn(future) |
| |
| future.complete(new TxnOffsetCommitResponseData() |
| .setTopics(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName(topicPartition.topic) |
| .setPartitions(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(topicPartition.partition) |
| .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code) |
| ).asJava) |
| ).asJava)) |
| |
| createKafkaApis().handleTxnOffsetCommitRequest(request, requestLocal) |
| |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val response = capturedResponse.getValue |
| |
| if (version < 2) { |
| assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.errors().get(topicPartition)) |
| } else { |
| assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, response.errors().get(topicPartition)) |
| } |
| } |
| |
| @Test |
| def shouldReplaceProducerFencedWithInvalidProducerEpochInInitProducerIdWithOlderClient(): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 2) |
| |
| for (version <- ApiKeys.INIT_PRODUCER_ID.oldestVersion to ApiKeys.INIT_PRODUCER_ID.latestVersion) { |
| |
| reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) |
| |
| val capturedResponse: ArgumentCaptor[InitProducerIdResponse] = ArgumentCaptor.forClass(classOf[InitProducerIdResponse]) |
| val responseCallback: ArgumentCaptor[InitProducerIdResult => Unit] = ArgumentCaptor.forClass(classOf[InitProducerIdResult => Unit]) |
| |
| val transactionalId = "txnId" |
| val producerId = if (version < 3) |
| RecordBatch.NO_PRODUCER_ID |
| else |
| 15 |
| |
| val epoch = if (version < 3) |
| RecordBatch.NO_PRODUCER_EPOCH |
| else |
| 0.toShort |
| |
| val txnTimeoutMs = TimeUnit.MINUTES.toMillis(15).toInt |
| |
| val initProducerIdRequest = new InitProducerIdRequest.Builder( |
| new InitProducerIdRequestData() |
| .setTransactionalId(transactionalId) |
| .setTransactionTimeoutMs(txnTimeoutMs) |
| .setProducerId(producerId) |
| .setProducerEpoch(epoch) |
| ).build(version.toShort) |
| |
| val request = buildRequest(initProducerIdRequest) |
| |
| val expectedProducerIdAndEpoch = if (version < 3) |
| Option.empty |
| else |
| Option(new ProducerIdAndEpoch(producerId, epoch)) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| when(txnCoordinator.handleInitProducerId( |
| ArgumentMatchers.eq(transactionalId), |
| ArgumentMatchers.eq(txnTimeoutMs), |
| ArgumentMatchers.eq(expectedProducerIdAndEpoch), |
| responseCallback.capture(), |
| ArgumentMatchers.eq(requestLocal) |
| )).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED))) |
| |
| createKafkaApis().handleInitProducerIdRequest(request, requestLocal) |
| |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val response = capturedResponse.getValue |
| |
| if (version < 4) { |
| assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode) |
| } else { |
| assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode) |
| } |
| } |
| } |
| |
| @Test |
| def shouldReplaceProducerFencedWithInvalidProducerEpochInAddOffsetToTxnWithOlderClient(): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 2) |
| |
| for (version <- ApiKeys.ADD_OFFSETS_TO_TXN.oldestVersion to ApiKeys.ADD_OFFSETS_TO_TXN.latestVersion) { |
| |
| reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator, txnCoordinator) |
| |
| val capturedResponse: ArgumentCaptor[AddOffsetsToTxnResponse] = ArgumentCaptor.forClass(classOf[AddOffsetsToTxnResponse]) |
| val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit]) |
| |
| val groupId = "groupId" |
| val transactionalId = "txnId" |
| val producerId = 15L |
| val epoch = 0.toShort |
| |
| val addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder( |
| new AddOffsetsToTxnRequestData() |
| .setGroupId(groupId) |
| .setTransactionalId(transactionalId) |
| .setProducerId(producerId) |
| .setProducerEpoch(epoch) |
| ).build(version.toShort) |
| val request = buildRequest(addOffsetsToTxnRequest) |
| |
| val partition = 1 |
| when(groupCoordinator.partitionFor( |
| ArgumentMatchers.eq(groupId) |
| )).thenReturn(partition) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| when(txnCoordinator.handleAddPartitionsToTransaction( |
| ArgumentMatchers.eq(transactionalId), |
| ArgumentMatchers.eq(producerId), |
| ArgumentMatchers.eq(epoch), |
| ArgumentMatchers.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))), |
| responseCallback.capture(), |
| ArgumentMatchers.eq(requestLocal) |
| )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) |
| |
| createKafkaApis().handleAddOffsetsToTxnRequest(request, requestLocal) |
| |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val response = capturedResponse.getValue |
| |
| if (version < 2) { |
| assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode) |
| } else { |
| assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode) |
| } |
| } |
| } |
| |
| @Test |
| def shouldReplaceProducerFencedWithInvalidProducerEpochInAddPartitionToTxnWithOlderClient(): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 2) |
| |
| for (version <- ApiKeys.ADD_PARTITIONS_TO_TXN.oldestVersion to 3) { |
| |
| reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) |
| |
| val capturedResponse: ArgumentCaptor[AddPartitionsToTxnResponse] = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnResponse]) |
| val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit]) |
| |
| val transactionalId = "txnId" |
| val producerId = 15L |
| val epoch = 0.toShort |
| |
| val partition = 1 |
| val topicPartition = new TopicPartition(topic, partition) |
| |
| val addPartitionsToTxnRequest = AddPartitionsToTxnRequest.Builder.forClient( |
| transactionalId, |
| producerId, |
| epoch, |
| Collections.singletonList(topicPartition) |
| ).build(version.toShort) |
| val request = buildRequest(addPartitionsToTxnRequest) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| when(txnCoordinator.handleAddPartitionsToTransaction( |
| ArgumentMatchers.eq(transactionalId), |
| ArgumentMatchers.eq(producerId), |
| ArgumentMatchers.eq(epoch), |
| ArgumentMatchers.eq(Set(topicPartition)), |
| responseCallback.capture(), |
| ArgumentMatchers.eq(requestLocal) |
| )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) |
| |
| createKafkaApis().handleAddPartitionsToTxnRequest(request, requestLocal) |
| |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val response = capturedResponse.getValue |
| |
| if (version < 2) { |
| assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID)) |
| } else { |
| assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID)) |
| } |
| } |
| } |
| |
| @Test |
| def testBatchedAddPartitionsToTxnRequest(): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 2) |
| |
| val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit]) |
| val verifyPartitionsCallback: ArgumentCaptor[AddPartitionsToTxnResult => Unit] = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnResult => Unit]) |
| |
| val transactionalId1 = "txnId1" |
| val transactionalId2 = "txnId2" |
| val producerId = 15L |
| val epoch = 0.toShort |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val tp1 = new TopicPartition(topic, 1) |
| |
| val addPartitionsToTxnRequest = AddPartitionsToTxnRequest.Builder.forBroker( |
| new AddPartitionsToTxnTransactionCollection( |
| List(new AddPartitionsToTxnTransaction() |
| .setTransactionalId(transactionalId1) |
| .setProducerId(producerId) |
| .setProducerEpoch(epoch) |
| .setVerifyOnly(false) |
| .setTopics(new AddPartitionsToTxnTopicCollection( |
| Collections.singletonList(new AddPartitionsToTxnTopic() |
| .setName(tp0.topic) |
| .setPartitions(Collections.singletonList(tp0.partition)) |
| ).iterator()) |
| ), new AddPartitionsToTxnTransaction() |
| .setTransactionalId(transactionalId2) |
| .setProducerId(producerId) |
| .setProducerEpoch(epoch) |
| .setVerifyOnly(true) |
| .setTopics(new AddPartitionsToTxnTopicCollection( |
| Collections.singletonList(new AddPartitionsToTxnTopic() |
| .setName(tp1.topic) |
| .setPartitions(Collections.singletonList(tp1.partition)) |
| ).iterator()) |
| ) |
| ).asJava.iterator() |
| ) |
| ).build(4.toShort) |
| val request = buildRequest(addPartitionsToTxnRequest) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| when(txnCoordinator.handleAddPartitionsToTransaction( |
| ArgumentMatchers.eq(transactionalId1), |
| ArgumentMatchers.eq(producerId), |
| ArgumentMatchers.eq(epoch), |
| ArgumentMatchers.eq(Set(tp0)), |
| responseCallback.capture(), |
| ArgumentMatchers.eq(requestLocal) |
| )).thenAnswer(_ => responseCallback.getValue.apply(Errors.NONE)) |
| |
| when(txnCoordinator.handleVerifyPartitionsInTransaction( |
| ArgumentMatchers.eq(transactionalId2), |
| ArgumentMatchers.eq(producerId), |
| ArgumentMatchers.eq(epoch), |
| ArgumentMatchers.eq(Set(tp1)), |
| verifyPartitionsCallback.capture(), |
| )).thenAnswer(_ => verifyPartitionsCallback.getValue.apply(AddPartitionsToTxnResponse.resultForTransaction(transactionalId2, Map(tp1 -> Errors.PRODUCER_FENCED).asJava))) |
| |
| createKafkaApis().handleAddPartitionsToTxnRequest(request, requestLocal) |
| |
| val response = verifyNoThrottling[AddPartitionsToTxnResponse](request) |
| |
| val expectedErrors = Map( |
| transactionalId1 -> Collections.singletonMap(tp0, Errors.NONE), |
| transactionalId2 -> Collections.singletonMap(tp1, Errors.PRODUCER_FENCED) |
| ).asJava |
| |
| assertEquals(expectedErrors, response.errors()) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) |
| def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { |
| val topic = "topic" |
| |
| val transactionalId = "txnId1" |
| val producerId = 15L |
| val epoch = 0.toShort |
| |
| val tp = new TopicPartition(topic, 0) |
| |
| val addPartitionsToTxnRequest = |
| if (version < 4) |
| AddPartitionsToTxnRequest.Builder.forClient( |
| transactionalId, |
| producerId, |
| epoch, |
| Collections.singletonList(tp)).build(version) |
| else |
| AddPartitionsToTxnRequest.Builder.forBroker( |
| new AddPartitionsToTxnTransactionCollection( |
| List(new AddPartitionsToTxnTransaction() |
| .setTransactionalId(transactionalId) |
| .setProducerId(producerId) |
| .setProducerEpoch(epoch) |
| .setVerifyOnly(true) |
| .setTopics(new AddPartitionsToTxnTopicCollection( |
| Collections.singletonList(new AddPartitionsToTxnTopic() |
| .setName(tp.topic) |
| .setPartitions(Collections.singletonList(tp.partition)) |
| ).iterator())) |
| ).asJava.iterator())).build(version) |
| |
| val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) |
| .thenReturn(Seq(AuthorizationResult.DENIED).asJava) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handle( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) |
| val error = if (version < 4) |
| response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) |
| else |
| Errors.forCode(response.data().errorCode) |
| |
| val expectedError = if (version < 4) Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED else Errors.CLUSTER_AUTHORIZATION_FAILED |
| assertEquals(expectedError, error) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) |
| def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 1) |
| |
| val transactionalId = "txnId1" |
| val producerId = 15L |
| val epoch = 0.toShort |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val tp1 = new TopicPartition(topic, 1) |
| |
| val addPartitionsToTxnRequest = if (version < 4) |
| AddPartitionsToTxnRequest.Builder.forClient( |
| transactionalId, |
| producerId, |
| epoch, |
| List(tp0, tp1).asJava).build(version) |
| else |
| AddPartitionsToTxnRequest.Builder.forBroker( |
| new AddPartitionsToTxnTransactionCollection( |
| List(new AddPartitionsToTxnTransaction() |
| .setTransactionalId(transactionalId) |
| .setProducerId(producerId) |
| .setProducerEpoch(epoch) |
| .setVerifyOnly(true) |
| .setTopics(new AddPartitionsToTxnTopicCollection( |
| Collections.singletonList(new AddPartitionsToTxnTopic() |
| .setName(tp0.topic) |
| .setPartitions(List[Integer](tp0.partition, tp1.partition()).asJava) |
| ).iterator())) |
| ).asJava.iterator())).build(version) |
| |
| val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) |
| |
| createKafkaApis().handleAddPartitionsToTxnRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) |
| |
| def checkErrorForTp(tp: TopicPartition, expectedError: Errors): Unit = { |
| val error = if (version < 4) |
| response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) |
| else |
| response.errors().get(transactionalId).get(tp) |
| |
| assertEquals(expectedError, error) |
| } |
| |
| checkErrorForTp(tp0, Errors.OPERATION_NOT_ATTEMPTED) |
| checkErrorForTp(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| |
| @Test |
| def shouldReplaceProducerFencedWithInvalidProducerEpochInEndTxnWithOlderClient(): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 2) |
| |
| for (version <- ApiKeys.END_TXN.oldestVersion to ApiKeys.END_TXN.latestVersion) { |
| reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) |
| |
| val capturedResponse: ArgumentCaptor[EndTxnResponse] = ArgumentCaptor.forClass(classOf[EndTxnResponse]) |
| val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit]) |
| |
| val transactionalId = "txnId" |
| val producerId = 15L |
| val epoch = 0.toShort |
| |
| val endTxnRequest = new EndTxnRequest.Builder( |
| new EndTxnRequestData() |
| .setTransactionalId(transactionalId) |
| .setProducerId(producerId) |
| .setProducerEpoch(epoch) |
| .setCommitted(true) |
| ).build(version.toShort) |
| val request = buildRequest(endTxnRequest) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| when(txnCoordinator.handleEndTransaction( |
| ArgumentMatchers.eq(transactionalId), |
| ArgumentMatchers.eq(producerId), |
| ArgumentMatchers.eq(epoch), |
| ArgumentMatchers.eq(TransactionResult.COMMIT), |
| responseCallback.capture(), |
| ArgumentMatchers.eq(requestLocal) |
| )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) |
| |
| createKafkaApis().handleEndTxnRequest(request, requestLocal) |
| |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val response = capturedResponse.getValue |
| |
| if (version < 2) { |
| assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode) |
| } else { |
| assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode) |
| } |
| } |
| } |
| |
| @Test |
| def shouldReplaceProducerFencedWithInvalidProducerEpochInProduceResponse(): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 2) |
| |
| for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) { |
| |
| reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) |
| |
| val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) |
| |
| val tp = new TopicPartition("topic", 0) |
| |
| val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData() |
| .setTopicData(new ProduceRequestData.TopicProduceDataCollection( |
| Collections.singletonList(new ProduceRequestData.TopicProduceData() |
| .setName(tp.topic).setPartitionData(Collections.singletonList( |
| new ProduceRequestData.PartitionProduceData() |
| .setIndex(tp.partition) |
| .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes)))))) |
| .iterator)) |
| .setAcks(1.toShort) |
| .setTimeoutMs(5000)) |
| .build(version.toShort) |
| val request = buildRequest(produceRequest) |
| |
| when(replicaManager.appendRecords(anyLong, |
| anyShort, |
| ArgumentMatchers.eq(false), |
| ArgumentMatchers.eq(AppendOrigin.CLIENT), |
| any(), |
| responseCallback.capture(), |
| any(), |
| any(), |
| any(), |
| any(), |
| any(), |
| any()) |
| ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH)))) |
| |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( |
| any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) |
| |
| createKafkaApis().handleProduceRequest(request, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[ProduceResponse](request) |
| |
| assertEquals(1, response.data.responses.size) |
| val topicProduceResponse = response.data.responses.asScala.head |
| assertEquals(1, topicProduceResponse.partitionResponses.size) |
| val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head |
| assertEquals(Errors.INVALID_PRODUCER_EPOCH, Errors.forCode(partitionProduceResponse.errorCode)) |
| } |
| } |
| |
| @Test |
| def testTransactionalParametersSetCorrectly(): Unit = { |
| val topic = "topic" |
| val transactionalId = "txn1" |
| val transactionCoordinatorPartition = 35 |
| |
| addTopicToMetadataCache(topic, numPartitions = 2) |
| |
| for (version <- 3 to ApiKeys.PRODUCE.latestVersion) { |
| |
| reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) |
| |
| val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) |
| |
| val tp = new TopicPartition("topic", 0) |
| |
| val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData() |
| .setTopicData(new ProduceRequestData.TopicProduceDataCollection( |
| Collections.singletonList(new ProduceRequestData.TopicProduceData() |
| .setName(tp.topic).setPartitionData(Collections.singletonList( |
| new ProduceRequestData.PartitionProduceData() |
| .setIndex(tp.partition) |
| .setRecords(MemoryRecords.withTransactionalRecords(CompressionType.NONE, 0, 0, 0, new SimpleRecord("test".getBytes)))))) |
| .iterator)) |
| .setAcks(1.toShort) |
| .setTransactionalId(transactionalId) |
| .setTimeoutMs(5000)) |
| .build(version.toShort) |
| val request = buildRequest(produceRequest) |
| |
| val kafkaApis = createKafkaApis() |
| |
| when(txnCoordinator.partitionFor( |
| ArgumentMatchers.eq(transactionalId)) |
| ).thenReturn(transactionCoordinatorPartition) |
| |
| kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching) |
| |
| verify(replicaManager).appendRecords(anyLong, |
| anyShort, |
| ArgumentMatchers.eq(false), |
| ArgumentMatchers.eq(AppendOrigin.CLIENT), |
| any(), |
| responseCallback.capture(), |
| any(), |
| any(), |
| any(), |
| ArgumentMatchers.eq(transactionalId), |
| ArgumentMatchers.eq(Some(transactionCoordinatorPartition)), |
| any()) |
| } |
| } |
| |
| @Test |
| def testAddPartitionsToTxnWithInvalidPartition(): Unit = { |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 1) |
| |
| def checkInvalidPartition(invalidPartitionId: Int): Unit = { |
| reset(replicaManager, clientRequestQuotaManager, requestChannel) |
| |
| val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId) |
| val addPartitionsToTxnRequest = AddPartitionsToTxnRequest.Builder.forClient( |
| "txnlId", 15L, 0.toShort, List(invalidTopicPartition).asJava |
| ).build() |
| val request = buildRequest(addPartitionsToTxnRequest) |
| |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| createKafkaApis().handleAddPartitionsToTxnRequest(request, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[AddPartitionsToTxnResponse](request) |
| assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(invalidTopicPartition)) |
| } |
| |
| checkInvalidPartition(-1) |
| checkInvalidPartition(1) // topic has only one partition |
| } |
| |
| @Test |
| def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { |
| assertThrows(classOf[UnsupportedVersionException], |
| () => createKafkaApis(IBP_0_10_2_IV0).handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) |
| } |
| |
| @Test |
| def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { |
| assertThrows(classOf[UnsupportedVersionException], |
| () => createKafkaApis(IBP_0_10_2_IV0).handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) |
| } |
| |
| @Test |
| def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = { |
| assertThrows(classOf[UnsupportedVersionException], |
| () => createKafkaApis(IBP_0_10_2_IV0).handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) |
| } |
| |
| @Test |
| def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { |
| assertThrows(classOf[UnsupportedVersionException], |
| () => createKafkaApis(IBP_0_10_2_IV0).handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching)) |
| } |
| |
| @Test |
| def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = { |
| assertThrows(classOf[UnsupportedVersionException], |
| () => createKafkaApis(IBP_0_10_2_IV0).handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching)) |
| } |
| |
| @Test |
| def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = { |
| val topicPartition = new TopicPartition("t", 0) |
| val (_, request) = createWriteTxnMarkersRequest(asList(topicPartition)) |
| val expectedErrors = Map(topicPartition -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT).asJava |
| val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) |
| |
| when(replicaManager.getMagic(topicPartition)) |
| .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| |
| createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching) |
| |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val markersResponse = capturedResponse.getValue |
| assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L)) |
| } |
| |
| @Test |
| def shouldRespondWithUnknownTopicWhenPartitionIsNotHosted(): Unit = { |
| val topicPartition = new TopicPartition("t", 0) |
| val (_, request) = createWriteTxnMarkersRequest(asList(topicPartition)) |
| val expectedErrors = Map(topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION).asJava |
| val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) |
| |
| when(replicaManager.getMagic(topicPartition)) |
| .thenReturn(None) |
| createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching) |
| |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val markersResponse = capturedResponse.getValue |
| assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L)) |
| } |
| |
| @Test |
| def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = { |
| val tp1 = new TopicPartition("t", 0) |
| val tp2 = new TopicPartition("t1", 0) |
| val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2)) |
| val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava |
| |
| val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) |
| val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) |
| |
| when(replicaManager.getMagic(tp1)) |
| .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| when(replicaManager.getMagic(tp2)) |
| .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| when(replicaManager.appendRecords(anyLong, |
| anyShort, |
| ArgumentMatchers.eq(true), |
| ArgumentMatchers.eq(AppendOrigin.COORDINATOR), |
| any(), |
| responseCallback.capture(), |
| any(), |
| any(), |
| ArgumentMatchers.eq(requestLocal), |
| any(), |
| any(), |
| any()) |
| ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))) |
| |
| createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal) |
| |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val markersResponse = capturedResponse.getValue |
| assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L)) |
| } |
| |
| @Test |
| def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndLeaderEpoch(): Unit = { |
| shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag( |
| LeaderAndIsr.InitialLeaderEpoch + 2, deletePartition = true) |
| } |
| |
| @Test |
| def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndDeleteSentinel(): Unit = { |
| shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag( |
| LeaderAndIsr.EpochDuringDelete, deletePartition = true) |
| } |
| |
| @Test |
| def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndNoEpochSentinel(): Unit = { |
| shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag( |
| LeaderAndIsr.NoEpoch, deletePartition = true) |
| } |
| |
| @Test |
| def shouldNotResignCoordinatorsIfStopReplicaReceivedWithoutDeleteFlag(): Unit = { |
| shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag( |
| LeaderAndIsr.InitialLeaderEpoch + 2, deletePartition = false) |
| } |
| |
| def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(leaderEpoch: Int, |
| deletePartition: Boolean): Unit = { |
| val controllerId = 0 |
| val controllerEpoch = 5 |
| val brokerEpoch = 230498320L |
| |
| val fooPartition = new TopicPartition("foo", 0) |
| val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) |
| val txnStatePartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, 0) |
| |
| val topicStates = Seq( |
| new StopReplicaTopicState() |
| .setTopicName(groupMetadataPartition.topic) |
| .setPartitionStates(Seq(new StopReplicaPartitionState() |
| .setPartitionIndex(groupMetadataPartition.partition) |
| .setLeaderEpoch(leaderEpoch) |
| .setDeletePartition(deletePartition)).asJava), |
| new StopReplicaTopicState() |
| .setTopicName(txnStatePartition.topic) |
| .setPartitionStates(Seq(new StopReplicaPartitionState() |
| .setPartitionIndex(txnStatePartition.partition) |
| .setLeaderEpoch(leaderEpoch) |
| .setDeletePartition(deletePartition)).asJava), |
| new StopReplicaTopicState() |
| .setTopicName(fooPartition.topic) |
| .setPartitionStates(Seq(new StopReplicaPartitionState() |
| .setPartitionIndex(fooPartition.partition) |
| .setLeaderEpoch(leaderEpoch) |
| .setDeletePartition(deletePartition)).asJava) |
| ).asJava |
| |
| val stopReplicaRequest = new StopReplicaRequest.Builder( |
| ApiKeys.STOP_REPLICA.latestVersion, |
| controllerId, |
| controllerEpoch, |
| brokerEpoch, |
| false, |
| topicStates |
| ).build() |
| val request = buildRequest(stopReplicaRequest) |
| |
| when(replicaManager.stopReplicas( |
| ArgumentMatchers.eq(request.context.correlationId), |
| ArgumentMatchers.eq(controllerId), |
| ArgumentMatchers.eq(controllerEpoch), |
| ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala) |
| )).thenReturn( |
| (mutable.Map( |
| groupMetadataPartition -> Errors.NONE, |
| txnStatePartition -> Errors.NONE, |
| fooPartition -> Errors.NONE |
| ), Errors.NONE) |
| ) |
| when(controller.brokerEpoch).thenReturn(brokerEpoch) |
| |
| createKafkaApis().handleStopReplicaRequest(request) |
| |
| if (deletePartition) { |
| if (leaderEpoch >= 0) { |
| verify(txnCoordinator).onResignation(txnStatePartition.partition, Some(leaderEpoch)) |
| verify(groupCoordinator).onResignation(groupMetadataPartition.partition, OptionalInt.of(leaderEpoch)) |
| } else { |
| verify(txnCoordinator).onResignation(txnStatePartition.partition, None) |
| verify(groupCoordinator).onResignation(groupMetadataPartition.partition, OptionalInt.empty) |
| } |
| } |
| } |
| |
| @Test |
| def shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition(): Unit = { |
| val tp1 = new TopicPartition("t", 0) |
| val tp2 = new TopicPartition("t1", 0) |
| val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2)) |
| val expectedErrors = Map(tp1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION, tp2 -> Errors.NONE).asJava |
| |
| val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) |
| val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) |
| |
| when(replicaManager.getMagic(tp1)) |
| .thenReturn(None) |
| when(replicaManager.getMagic(tp2)) |
| .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| when(replicaManager.appendRecords(anyLong, |
| anyShort, |
| ArgumentMatchers.eq(true), |
| ArgumentMatchers.eq(AppendOrigin.COORDINATOR), |
| any(), |
| responseCallback.capture(), |
| any(), |
| any(), |
| ArgumentMatchers.eq(requestLocal), |
| any(), |
| any(), |
| any()) |
| ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))) |
| |
| createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal) |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| |
| val markersResponse = capturedResponse.getValue |
| assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L)) |
| } |
| |
| @Test |
| def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = { |
| val topicPartition = new TopicPartition("t", 0) |
| val request = createWriteTxnMarkersRequest(asList(topicPartition))._2 |
| when(replicaManager.getMagic(topicPartition)) |
| .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| |
| createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal) |
| verify(replicaManager).appendRecords(anyLong, |
| anyShort, |
| ArgumentMatchers.eq(true), |
| ArgumentMatchers.eq(AppendOrigin.COORDINATOR), |
| any(), |
| any(), |
| any(), |
| any(), |
| ArgumentMatchers.eq(requestLocal), |
| any(), |
| any(), |
| any()) |
| } |
| |
| @Test |
| def testLeaderReplicaIfLocalRaisesFencedLeaderEpoch(): Unit = { |
| testListOffsetFailedGetLeaderReplica(Errors.FENCED_LEADER_EPOCH) |
| } |
| |
| @Test |
| def testLeaderReplicaIfLocalRaisesUnknownLeaderEpoch(): Unit = { |
| testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_LEADER_EPOCH) |
| } |
| |
| @Test |
| def testLeaderReplicaIfLocalRaisesNotLeaderOrFollower(): Unit = { |
| testListOffsetFailedGetLeaderReplica(Errors.NOT_LEADER_OR_FOLLOWER) |
| } |
| |
| @Test |
| def testLeaderReplicaIfLocalRaisesUnknownTopicOrPartition(): Unit = { |
| testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| |
| @Test |
| def testHandleDeleteGroups(): Unit = { |
| val deleteGroupsRequest = new DeleteGroupsRequestData().setGroupsNames(List( |
| "group-1", |
| "group-2", |
| "group-3" |
| ).asJava) |
| |
| val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build()) |
| |
| val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]() |
| when(groupCoordinator.deleteGroups( |
| requestChannelRequest.context, |
| List("group-1", "group-2", "group-3").asJava, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleDeleteGroupsRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val results = new DeleteGroupsResponseData.DeletableGroupResultCollection(List( |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-1") |
| .setErrorCode(Errors.NONE.code), |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.NOT_CONTROLLER.code), |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-3") |
| .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code), |
| ).iterator.asJava) |
| |
| future.complete(results) |
| |
| val expectedDeleteGroupsResponse = new DeleteGroupsResponseData() |
| .setResults(results) |
| |
| val response = verifyNoThrottling[DeleteGroupsResponse](requestChannelRequest) |
| assertEquals(expectedDeleteGroupsResponse, response.data) |
| } |
| |
| @Test |
| def testHandleDeleteGroupsFutureFailed(): Unit = { |
| val deleteGroupsRequest = new DeleteGroupsRequestData().setGroupsNames(List( |
| "group-1", |
| "group-2", |
| "group-3" |
| ).asJava) |
| |
| val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build()) |
| |
| val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]() |
| when(groupCoordinator.deleteGroups( |
| requestChannelRequest.context, |
| List("group-1", "group-2", "group-3").asJava, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleDeleteGroupsRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| future.completeExceptionally(Errors.NOT_CONTROLLER.exception) |
| |
| val expectedDeleteGroupsResponse = new DeleteGroupsResponseData() |
| .setResults(new DeleteGroupsResponseData.DeletableGroupResultCollection(List( |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-1") |
| .setErrorCode(Errors.NOT_CONTROLLER.code), |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.NOT_CONTROLLER.code), |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-3") |
| .setErrorCode(Errors.NOT_CONTROLLER.code), |
| ).iterator.asJava)) |
| |
| val response = verifyNoThrottling[DeleteGroupsResponse](requestChannelRequest) |
| assertEquals(expectedDeleteGroupsResponse, response.data) |
| } |
| |
| @Test |
| def testHandleDeleteGroupsAuthenticationFailed(): Unit = { |
| val deleteGroupsRequest = new DeleteGroupsRequestData().setGroupsNames(List( |
| "group-1", |
| "group-2", |
| "group-3" |
| ).asJava) |
| |
| val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build()) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| val acls = Map( |
| "group-1" -> AuthorizationResult.DENIED, |
| "group-2" -> AuthorizationResult.ALLOWED, |
| "group-3" -> AuthorizationResult.ALLOWED |
| ) |
| |
| when(authorizer.authorize( |
| any[RequestContext], |
| any[util.List[Action]] |
| )).thenAnswer { invocation => |
| val actions = invocation.getArgument(1, classOf[util.List[Action]]) |
| actions.asScala.map { action => |
| acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) |
| }.asJava |
| } |
| |
| val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]() |
| when(groupCoordinator.deleteGroups( |
| requestChannelRequest.context, |
| List("group-2", "group-3").asJava, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleDeleteGroupsRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| future.complete(new DeleteGroupsResponseData.DeletableGroupResultCollection(List( |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.NONE.code), |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-3") |
| .setErrorCode(Errors.NONE.code) |
| ).iterator.asJava)) |
| |
| val expectedDeleteGroupsResponse = new DeleteGroupsResponseData() |
| .setResults(new DeleteGroupsResponseData.DeletableGroupResultCollection(List( |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.NONE.code), |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-3") |
| .setErrorCode(Errors.NONE.code), |
| new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-1") |
| .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)).iterator.asJava)) |
| |
| val response = verifyNoThrottling[DeleteGroupsResponse](requestChannelRequest) |
| assertEquals(expectedDeleteGroupsResponse, response.data) |
| } |
| |
| @Test |
| def testHandleDescribeGroups(): Unit = { |
| val describeGroupsRequest = new DescribeGroupsRequestData().setGroups(List( |
| "group-1", |
| "group-2", |
| "group-3" |
| ).asJava) |
| |
| val requestChannelRequest = buildRequest(new DescribeGroupsRequest.Builder(describeGroupsRequest).build()) |
| |
| val future = new CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]() |
| when(groupCoordinator.describeGroups( |
| requestChannelRequest.context, |
| describeGroupsRequest.groups |
| )).thenReturn(future) |
| |
| createKafkaApis().handleDescribeGroupsRequest(requestChannelRequest) |
| |
| val groupResults = List( |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-1") |
| .setProtocolType("consumer") |
| .setProtocolData("range") |
| .setGroupState("Stable") |
| .setMembers(List( |
| new DescribeGroupsResponseData.DescribedGroupMember() |
| .setMemberId("member-1")).asJava), |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.NOT_COORDINATOR.code), |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-3") |
| .setErrorCode(Errors.REQUEST_TIMED_OUT.code) |
| ).asJava |
| |
| future.complete(groupResults) |
| |
| val expectedDescribeGroupsResponse = new DescribeGroupsResponseData().setGroups(groupResults) |
| val response = verifyNoThrottling[DescribeGroupsResponse](requestChannelRequest) |
| assertEquals(expectedDescribeGroupsResponse, response.data) |
| } |
| |
| @Test |
| def testHandleDescribeGroupsFutureFailed(): Unit = { |
| val describeGroupsRequest = new DescribeGroupsRequestData().setGroups(List( |
| "group-1", |
| "group-2", |
| "group-3" |
| ).asJava) |
| |
| val requestChannelRequest = buildRequest(new DescribeGroupsRequest.Builder(describeGroupsRequest).build()) |
| |
| val future = new CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]() |
| when(groupCoordinator.describeGroups( |
| requestChannelRequest.context, |
| describeGroupsRequest.groups |
| )).thenReturn(future) |
| |
| createKafkaApis().handleDescribeGroupsRequest(requestChannelRequest) |
| |
| val expectedDescribeGroupsResponse = new DescribeGroupsResponseData().setGroups(List( |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-1") |
| .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code), |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code), |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-3") |
| .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code) |
| ).asJava) |
| |
| future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception) |
| |
| val response = verifyNoThrottling[DescribeGroupsResponse](requestChannelRequest) |
| assertEquals(expectedDescribeGroupsResponse, response.data) |
| } |
| |
| @Test |
| def testHandleDescribeGroupsAuthenticationFailed(): Unit = { |
| val describeGroupsRequest = new DescribeGroupsRequestData().setGroups(List( |
| "group-1", |
| "group-2", |
| "group-3" |
| ).asJava) |
| |
| val requestChannelRequest = buildRequest(new DescribeGroupsRequest.Builder(describeGroupsRequest).build()) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| val acls = Map( |
| "group-1" -> AuthorizationResult.DENIED, |
| "group-2" -> AuthorizationResult.ALLOWED, |
| "group-3" -> AuthorizationResult.DENIED |
| ) |
| |
| when(authorizer.authorize( |
| any[RequestContext], |
| any[util.List[Action]] |
| )).thenAnswer { invocation => |
| val actions = invocation.getArgument(1, classOf[util.List[Action]]) |
| actions.asScala.map { action => |
| acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) |
| }.asJava |
| } |
| |
| val future = new CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]() |
| when(groupCoordinator.describeGroups( |
| requestChannelRequest.context, |
| List("group-2").asJava |
| )).thenReturn(future) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleDescribeGroupsRequest(requestChannelRequest) |
| |
| future.complete(List( |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.NOT_COORDINATOR.code) |
| ).asJava) |
| |
| val expectedDescribeGroupsResponse = new DescribeGroupsResponseData().setGroups(List( |
| // group-1 and group-3 are first because unauthorized are put first into the response. |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-1") |
| .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code), |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-3") |
| .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code), |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.NOT_COORDINATOR.code) |
| ).asJava) |
| |
| val response = verifyNoThrottling[DescribeGroupsResponse](requestChannelRequest) |
| assertEquals(expectedDescribeGroupsResponse, response.data) |
| } |
| |
| @Test |
| def testOffsetDelete(): Unit = { |
| val group = "groupId" |
| addTopicToMetadataCache("topic-1", numPartitions = 2) |
| addTopicToMetadataCache("topic-2", numPartitions = 2) |
| |
| val topics = new OffsetDeleteRequestTopicCollection() |
| topics.add(new OffsetDeleteRequestTopic() |
| .setName("topic-1") |
| .setPartitions(Seq( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1)).asJava)) |
| topics.add(new OffsetDeleteRequestTopic() |
| .setName("topic-2") |
| .setPartitions(Seq( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1)).asJava)) |
| |
| val offsetDeleteRequest = new OffsetDeleteRequest.Builder( |
| new OffsetDeleteRequestData() |
| .setGroupId(group) |
| .setTopics(topics) |
| ).build() |
| val request = buildRequest(offsetDeleteRequest) |
| |
| val requestLocal = RequestLocal.withThreadConfinedCaching |
| val future = new CompletableFuture[OffsetDeleteResponseData]() |
| when(groupCoordinator.deleteOffsets( |
| request.context, |
| offsetDeleteRequest.data, |
| requestLocal.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleOffsetDeleteRequest(request, requestLocal) |
| |
| val offsetDeleteResponseData = new OffsetDeleteResponseData() |
| .setTopics(new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(List( |
| new OffsetDeleteResponseData.OffsetDeleteResponseTopic() |
| .setName("topic-1") |
| .setPartitions(new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(List( |
| new OffsetDeleteResponseData.OffsetDeleteResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetDeleteResponseData.OffsetDeleteResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code) |
| ).asJava.iterator)), |
| new OffsetDeleteResponseData.OffsetDeleteResponseTopic() |
| .setName("topic-2") |
| .setPartitions(new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(List( |
| new OffsetDeleteResponseData.OffsetDeleteResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetDeleteResponseData.OffsetDeleteResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code) |
| ).asJava.iterator)) |
| ).asJava.iterator())) |
| |
| future.complete(offsetDeleteResponseData) |
| |
| val response = verifyNoThrottling[OffsetDeleteResponse](request) |
| assertEquals(offsetDeleteResponseData, response.data) |
| } |
| |
| @Test |
| def testOffsetDeleteTopicsAndPartitionsValidation(): Unit = { |
| val group = "groupId" |
| addTopicToMetadataCache("foo", numPartitions = 2) |
| addTopicToMetadataCache("bar", numPartitions = 2) |
| |
| val offsetDeleteRequest = new OffsetDeleteRequestData() |
| .setGroupId(group) |
| .setTopics(new OffsetDeleteRequestTopicCollection(List( |
| // foo exists but has only 2 partitions. |
| new OffsetDeleteRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1), |
| new OffsetDeleteRequestPartition().setPartitionIndex(2) |
| ).asJava), |
| // bar exists. |
| new OffsetDeleteRequestTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1) |
| ).asJava), |
| // zar does not exist. |
| new OffsetDeleteRequestTopic() |
| .setName("zar") |
| .setPartitions(List( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1) |
| ).asJava), |
| ).asJava.iterator)) |
| |
| val requestChannelRequest = buildRequest(new OffsetDeleteRequest.Builder(offsetDeleteRequest).build()) |
| |
| // This is the request expected by the group coordinator. It contains |
| // only existing topic-partitions. |
| val expectedOffsetDeleteRequest = new OffsetDeleteRequestData() |
| .setGroupId(group) |
| .setTopics(new OffsetDeleteRequestTopicCollection(List( |
| new OffsetDeleteRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1) |
| ).asJava), |
| new OffsetDeleteRequestTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1) |
| ).asJava) |
| ).asJava.iterator)) |
| |
| val future = new CompletableFuture[OffsetDeleteResponseData]() |
| when(groupCoordinator.deleteOffsets( |
| requestChannelRequest.context, |
| expectedOffsetDeleteRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handle( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| // This is the response returned by the group coordinator. |
| val offsetDeleteResponse = new OffsetDeleteResponseData() |
| .setTopics(new OffsetDeleteResponseTopicCollection(List( |
| new OffsetDeleteResponseTopic() |
| .setName("foo") |
| .setPartitions(new OffsetDeleteResponsePartitionCollection(List( |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code) |
| ).asJava.iterator)), |
| new OffsetDeleteResponseTopic() |
| .setName("bar") |
| .setPartitions(new OffsetDeleteResponsePartitionCollection(List( |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code) |
| ).asJava.iterator)), |
| ).asJava.iterator)) |
| |
| val expectedOffsetDeleteResponse = new OffsetDeleteResponseData() |
| .setTopics(new OffsetDeleteResponseTopicCollection(List( |
| new OffsetDeleteResponseTopic() |
| .setName("foo") |
| .setPartitions(new OffsetDeleteResponsePartitionCollection(List( |
| // foo-2 is first because partitions failing the validation |
| // are put in the response first. |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(2) |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code) |
| ).asJava.iterator)), |
| // zar is before bar because topics failing the validation are |
| // put in the response first. |
| new OffsetDeleteResponseTopic() |
| .setName("zar") |
| .setPartitions(new OffsetDeleteResponsePartitionCollection(List( |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) |
| ).asJava.iterator)), |
| new OffsetDeleteResponseTopic() |
| .setName("bar") |
| .setPartitions(new OffsetDeleteResponsePartitionCollection(List( |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code) |
| ).asJava.iterator)), |
| ).asJava.iterator)) |
| |
| future.complete(offsetDeleteResponse) |
| val response = verifyNoThrottling[OffsetDeleteResponse](requestChannelRequest) |
| assertEquals(expectedOffsetDeleteResponse, response.data) |
| } |
| |
| @Test |
| def testOffsetDeleteWithInvalidPartition(): Unit = { |
| val group = "groupId" |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 1) |
| |
| def checkInvalidPartition(invalidPartitionId: Int): Unit = { |
| reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel) |
| |
| val topics = new OffsetDeleteRequestTopicCollection() |
| topics.add(new OffsetDeleteRequestTopic() |
| .setName(topic) |
| .setPartitions(Collections.singletonList( |
| new OffsetDeleteRequestPartition().setPartitionIndex(invalidPartitionId)))) |
| val offsetDeleteRequest = new OffsetDeleteRequest.Builder( |
| new OffsetDeleteRequestData() |
| .setGroupId(group) |
| .setTopics(topics) |
| ).build() |
| val request = buildRequest(offsetDeleteRequest) |
| |
| // The group coordinator is called even if there are no |
| // topic-partitions left after the validation. |
| when(groupCoordinator.deleteOffsets( |
| request.context, |
| new OffsetDeleteRequestData().setGroupId(group), |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(CompletableFuture.completedFuture( |
| new OffsetDeleteResponseData() |
| )) |
| |
| createKafkaApis().handleOffsetDeleteRequest(request, RequestLocal.NoCaching) |
| |
| val response = verifyNoThrottling[OffsetDeleteResponse](request) |
| |
| assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, |
| Errors.forCode(response.data.topics.find(topic).partitions.find(invalidPartitionId).errorCode)) |
| } |
| |
| checkInvalidPartition(-1) |
| checkInvalidPartition(1) // topic has only one partition |
| } |
| |
| @Test |
| def testOffsetDeleteWithInvalidGroup(): Unit = { |
| val group = "groupId" |
| val topic = "topic" |
| addTopicToMetadataCache(topic, numPartitions = 1) |
| |
| val offsetDeleteRequest = new OffsetDeleteRequest.Builder( |
| new OffsetDeleteRequestData().setGroupId(group) |
| ).build() |
| val request = buildRequest(offsetDeleteRequest) |
| |
| val future = new CompletableFuture[OffsetDeleteResponseData]() |
| when(groupCoordinator.deleteOffsets( |
| request.context, |
| offsetDeleteRequest.data, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleOffsetDeleteRequest(request, RequestLocal.NoCaching) |
| |
| future.completeExceptionally(Errors.GROUP_ID_NOT_FOUND.exception) |
| |
| val response = verifyNoThrottling[OffsetDeleteResponse](request) |
| |
| assertEquals(Errors.GROUP_ID_NOT_FOUND, Errors.forCode(response.data.errorCode)) |
| } |
| |
| private def testListOffsetFailedGetLeaderReplica(error: Errors): Unit = { |
| val tp = new TopicPartition("foo", 0) |
| val isolationLevel = IsolationLevel.READ_UNCOMMITTED |
| val currentLeaderEpoch = Optional.of[Integer](15) |
| |
| when(replicaManager.fetchOffsetForTimestamp( |
| ArgumentMatchers.eq(tp), |
| ArgumentMatchers.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP), |
| ArgumentMatchers.eq(Some(isolationLevel)), |
| ArgumentMatchers.eq(currentLeaderEpoch), |
| fetchOnlyFromLeader = ArgumentMatchers.eq(true)) |
| ).thenThrow(error.exception) |
| |
| val targetTimes = List(new ListOffsetsTopic() |
| .setName(tp.topic) |
| .setPartitions(List(new ListOffsetsPartition() |
| .setPartitionIndex(tp.partition) |
| .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) |
| .setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava |
| val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false) |
| .setTargetTimes(targetTimes).build() |
| val request = buildRequest(listOffsetRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| createKafkaApis().handleListOffsetRequest(request) |
| |
| val response = verifyNoThrottling[ListOffsetsResponse](request) |
| val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get |
| .partitions.asScala.find(_.partitionIndex == tp.partition) |
| assertTrue(partitionDataOptional.isDefined) |
| |
| val partitionData = partitionDataOptional.get |
| assertEquals(error.code, partitionData.errorCode) |
| assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, partitionData.offset) |
| assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp) |
| } |
| |
| @Test |
| def testReadUncommittedConsumerListOffsetLatest(): Unit = { |
| testConsumerListOffsetLatest(IsolationLevel.READ_UNCOMMITTED) |
| } |
| |
| @Test |
| def testReadCommittedConsumerListOffsetLatest(): Unit = { |
| testConsumerListOffsetLatest(IsolationLevel.READ_COMMITTED) |
| } |
| |
| /** |
| * Verifies that the metadata response is correct if the broker listeners are inconsistent (i.e. one broker has |
| * more listeners than another) and the request is sent on the listener that exists in both brokers. |
| */ |
| @Test |
| def testMetadataRequestOnSharedListenerWithInconsistentListenersAcrossBrokers(): Unit = { |
| val (plaintextListener, _) = updateMetadataCacheWithInconsistentListeners() |
| val response = sendMetadataRequestWithInconsistentListeners(plaintextListener) |
| assertEquals(Set(0, 1), response.brokers.asScala.map(_.id).toSet) |
| } |
| |
| /** |
| * Verifies that the metadata response is correct if the broker listeners are inconsistent (i.e. one broker has |
| * more listeners than another) and the request is sent on the listener that exists in one broker. |
| */ |
| @Test |
| def testMetadataRequestOnDistinctListenerWithInconsistentListenersAcrossBrokers(): Unit = { |
| val (_, anotherListener) = updateMetadataCacheWithInconsistentListeners() |
| val response = sendMetadataRequestWithInconsistentListeners(anotherListener) |
| assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet) |
| } |
| |
| |
| /** |
| * Metadata request to fetch all topics should not result in the followings: |
| * 1) Auto topic creation |
| * 2) UNKNOWN_TOPIC_OR_PARTITION |
| * |
| * This case is testing the case that a topic is being deleted from MetadataCache right after |
| * authorization but before checking in MetadataCache. |
| */ |
| @Test |
| def testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): Unit = { |
| // Setup: authorizer authorizes 2 topics, but one got deleted in metadata cache |
| metadataCache = mock(classOf[ZkMetadataCache]) |
| when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new Node(brokerId,"localhost", 0))) |
| when(metadataCache.getControllerId).thenReturn(None) |
| |
| // 2 topics returned for authorization in during handle |
| val topicsReturnedFromMetadataCacheForAuthorization = Set("remaining-topic", "later-deleted-topic") |
| when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization) |
| // 1 topic is deleted from metadata right at the time between authorization and the next getTopicMetadata() call |
| when(metadataCache.getTopicMetadata( |
| ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization), |
| any[ListenerName], |
| anyBoolean, |
| anyBoolean |
| )).thenReturn(Seq( |
| new MetadataResponseTopic() |
| .setErrorCode(Errors.NONE.code) |
| .setName("remaining-topic") |
| .setIsInternal(false) |
| )) |
| |
| |
| var createTopicIsCalled: Boolean = false |
| // Specific mock on zkClient for this use case |
| // Expect it's never called to do auto topic creation |
| when(zkClient.setOrCreateEntityConfigs( |
| ArgumentMatchers.eq(ConfigType.Topic), |
| anyString, |
| any[Properties] |
| )).thenAnswer(_ => { |
| createTopicIsCalled = true |
| }) |
| // No need to use |
| when(zkClient.getAllBrokersInCluster) |
| .thenReturn(Seq(new Broker( |
| brokerId, "localhost", 9902, |
| ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT |
| ))) |
| |
| |
| val (requestListener, _) = updateMetadataCacheWithInconsistentListeners() |
| val response = sendMetadataRequestWithInconsistentListeners(requestListener) |
| |
| assertFalse(createTopicIsCalled) |
| val responseTopics = response.topicMetadata().asScala.map { metadata => metadata.topic() } |
| assertEquals(List("remaining-topic"), responseTopics) |
| assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty) |
| } |
| |
| @Test |
| def testUnauthorizedTopicMetadataRequest(): Unit = { |
| // 1. Set up broker information |
| val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) |
| val broker = new UpdateMetadataBroker() |
| .setId(0) |
| .setRack("rack") |
| .setEndpoints(Seq( |
| new UpdateMetadataEndpoint() |
| .setHost("broker0") |
| .setPort(9092) |
| .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) |
| .setListener(plaintextListener.value) |
| ).asJava) |
| |
| // 2. Set up authorizer |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| val unauthorizedTopic = "unauthorized-topic" |
| val authorizedTopic = "authorized-topic" |
| |
| val expectedActions = Seq( |
| new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true), |
| new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true) |
| ) |
| |
| // Here we need to use AuthHelperTest.matchSameElements instead of EasyMock.eq since the order of the request is unknown |
| when(authorizer.authorize(any[RequestContext], argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava)))) |
| .thenAnswer { invocation => |
| val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala |
| actions.map { action => |
| if (action.resourcePattern().name().equals(authorizedTopic)) |
| AuthorizationResult.ALLOWED |
| else |
| AuthorizationResult.DENIED |
| }.asJava |
| } |
| |
| // 3. Set up MetadataCache |
| val authorizedTopicId = Uuid.randomUuid() |
| val unauthorizedTopicId = Uuid.randomUuid() |
| |
| val topicIds = new util.HashMap[String, Uuid]() |
| topicIds.put(authorizedTopic, authorizedTopicId) |
| topicIds.put(unauthorizedTopic, unauthorizedTopicId) |
| |
| def createDummyPartitionStates(topic: String) = { |
| new UpdateMetadataPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(0) |
| .setReplicas(Collections.singletonList(0)) |
| .setZkVersion(0) |
| .setIsr(Collections.singletonList(0)) |
| } |
| |
| // Send UpdateMetadataReq to update MetadataCache |
| val partitionStates = Seq(unauthorizedTopic, authorizedTopic).map(createDummyPartitionStates) |
| |
| val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, |
| 0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build() |
| metadataCache.asInstanceOf[ZkMetadataCache].updateMetadata(correlationId = 0, updateMetadataRequest) |
| |
| // 4. Send TopicMetadataReq using topicId |
| val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build() |
| val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleTopicMetadataRequest(repByTopicId) |
| val metadataByTopicIdResp = verifyNoThrottling[MetadataResponse](repByTopicId) |
| |
| val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head)) |
| |
| metadataByTopicId.foreach { case (topicId, metadataResponseTopic) => |
| if (topicId == unauthorizedTopicId) { |
| // Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error regardless of leaking the existence of topic id |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode()) |
| // Do not return topic information on unauthorized error |
| assertNull(metadataResponseTopic.name()) |
| } else { |
| assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode()) |
| assertEquals(authorizedTopic, metadataResponseTopic.name()) |
| } |
| } |
| |
| // 4. Send TopicMetadataReq using topic name |
| reset(clientRequestQuotaManager, requestChannel) |
| val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build() |
| val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleTopicMetadataRequest(repByTopicName) |
| val metadataByTopicNameResp = verifyNoThrottling[MetadataResponse](repByTopicName) |
| |
| val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head)) |
| |
| metadataByTopicName.foreach { case (topicName, metadataResponseTopic) => |
| if (topicName == unauthorizedTopic) { |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode()) |
| // Do not return topic Id on unauthorized error |
| assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId()) |
| } else { |
| assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode()) |
| assertEquals(authorizedTopicId, metadataResponseTopic.topicId()) |
| } |
| } |
| } |
| |
| /** |
| * Verifies that sending a fetch request with version 9 works correctly when |
| * ReplicaManager.getLogConfig returns None. |
| */ |
| @Test |
| def testFetchRequestV9WithNoLogConfig(): Unit = { |
| val tidp = new TopicIdPartition(Uuid.ZERO_UUID, new TopicPartition("foo", 0)) |
| val tp = tidp.topicPartition |
| addTopicToMetadataCache(tp.topic, numPartitions = 1) |
| val hw = 3 |
| val timestamp = 1000 |
| |
| when(replicaManager.getLogConfig(ArgumentMatchers.eq(tp))).thenReturn(None) |
| |
| when(replicaManager.fetchMessages( |
| any[FetchParams], |
| any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], |
| any[ReplicaQuota], |
| any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]() |
| )).thenAnswer(invocation => { |
| val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] |
| val records = MemoryRecords.withRecords(CompressionType.NONE, |
| new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8))) |
| callback(Seq(tidp -> new FetchPartitionData(Errors.NONE, hw, 0, records, |
| Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false))) |
| }) |
| |
| val fetchData = Map(tidp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, |
| Optional.empty())).asJava |
| val fetchDataBuilder = Map(tp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, |
| Optional.empty())).asJava |
| val fetchMetadata = new JFetchMetadata(0, 0) |
| val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100), |
| fetchMetadata, fetchData, false, false) |
| when(fetchManager.newContext( |
| any[Short], |
| any[JFetchMetadata], |
| any[Boolean], |
| any[util.Map[TopicIdPartition, FetchRequest.PartitionData]], |
| any[util.List[TopicIdPartition]], |
| any[util.Map[Uuid, String]])).thenReturn(fetchContext) |
| |
| when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( |
| any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) |
| |
| val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0, fetchDataBuilder) |
| .build() |
| val request = buildRequest(fetchRequest) |
| |
| createKafkaApis().handleFetchRequest(request) |
| |
| val response = verifyNoThrottling[FetchResponse](request) |
| val responseData = response.responseData(metadataCache.topicIdsToNames(), 9) |
| assertTrue(responseData.containsKey(tp)) |
| |
| val partitionData = responseData.get(tp) |
| assertEquals(Errors.NONE.code, partitionData.errorCode) |
| assertEquals(hw, partitionData.highWatermark) |
| assertEquals(-1, partitionData.lastStableOffset) |
| assertEquals(0, partitionData.logStartOffset) |
| assertEquals(timestamp, FetchResponse.recordsOrFail(partitionData).batches.iterator.next.maxTimestamp) |
| assertNull(partitionData.abortedTransactions) |
| } |
| |
| /** |
| * Verifies that partitions with unknown topic ID errors are added to the erroneous set and there is not an attempt to fetch them. |
| */ |
| @ParameterizedTest |
| @ValueSource(ints = Array(-1, 0)) |
| def testFetchRequestErroneousPartitions(replicaId: Int): Unit = { |
| val foo = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) |
| val unresolvedFoo = new TopicIdPartition(foo.topicId, new TopicPartition(null, foo.partition)) |
| |
| addTopicToMetadataCache(foo.topic, 1, topicId = foo.topicId) |
| |
| // We will never return a logConfig when the topic name is null. This is ok since we won't have any records to convert. |
| when(replicaManager.getLogConfig(ArgumentMatchers.eq(unresolvedFoo.topicPartition))).thenReturn(None) |
| |
| // Simulate unknown topic ID in the context |
| val fetchData = Map(new TopicIdPartition(foo.topicId, new TopicPartition(null, foo.partition)) -> |
| new FetchRequest.PartitionData(foo.topicId, 0, 0, 1000, Optional.empty())).asJava |
| val fetchDataBuilder = Map(foo.topicPartition -> new FetchRequest.PartitionData(foo.topicId, 0, 0, 1000, |
| Optional.empty())).asJava |
| val fetchMetadata = new JFetchMetadata(0, 0) |
| val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100), |
| fetchMetadata, fetchData, true, replicaId >= 0) |
| // We expect to have the resolved partition, but we will simulate an unknown one with the fetchContext we return. |
| when(fetchManager.newContext( |
| ApiKeys.FETCH.latestVersion, |
| fetchMetadata, |
| replicaId >= 0, |
| Collections.singletonMap(foo, new FetchRequest.PartitionData(foo.topicId, 0, 0, 1000, Optional.empty())), |
| Collections.emptyList[TopicIdPartition], |
| metadataCache.topicIdsToNames()) |
| ).thenReturn(fetchContext) |
| |
| when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( |
| any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) |
| |
| // If replicaId is -1 we will build a consumer request. Any non-negative replicaId will build a follower request. |
| val replicaEpoch = if (replicaId < 0) -1 else 1 |
| val fetchRequest = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion, |
| replicaId, replicaEpoch, 100, 0, fetchDataBuilder).metadata(fetchMetadata).build() |
| val request = buildRequest(fetchRequest) |
| |
| createKafkaApis().handleFetchRequest(request) |
| |
| val response = verifyNoThrottling[FetchResponse](request) |
| val responseData = response.responseData(metadataCache.topicIdsToNames(), ApiKeys.FETCH.latestVersion) |
| assertTrue(responseData.containsKey(foo.topicPartition)) |
| |
| val partitionData = responseData.get(foo.topicPartition) |
| assertEquals(Errors.UNKNOWN_TOPIC_ID.code, partitionData.errorCode) |
| assertEquals(-1, partitionData.highWatermark) |
| assertEquals(-1, partitionData.lastStableOffset) |
| assertEquals(-1, partitionData.logStartOffset) |
| assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) |
| def testHandleJoinGroupRequest(version: Short): Unit = { |
| val joinGroupRequest = new JoinGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setRebalanceTimeoutMs(1000) |
| .setSessionTimeoutMs(2000) |
| |
| val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) |
| |
| val expectedJoinGroupRequest = new JoinGroupRequestData() |
| .setGroupId(joinGroupRequest.groupId) |
| .setMemberId(joinGroupRequest.memberId) |
| .setProtocolType(joinGroupRequest.protocolType) |
| .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) |
| .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) |
| |
| val future = new CompletableFuture[JoinGroupResponseData]() |
| when(groupCoordinator.joinGroup( |
| requestChannelRequest.context, |
| expectedJoinGroupRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleJoinGroupRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val expectedJoinGroupResponse = new JoinGroupResponseData() |
| .setMemberId("member") |
| .setGenerationId(0) |
| .setLeader("leader") |
| .setProtocolType(if (version >= 7) "consumer" else null) |
| .setProtocolName("range") |
| |
| future.complete(expectedJoinGroupResponse) |
| val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest) |
| assertEquals(expectedJoinGroupResponse, response.data) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) |
| def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Unit = { |
| val joinGroupRequest = new JoinGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setRebalanceTimeoutMs(1000) |
| .setSessionTimeoutMs(2000) |
| |
| val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) |
| |
| val expectedJoinGroupRequest = new JoinGroupRequestData() |
| .setGroupId(joinGroupRequest.groupId) |
| .setMemberId(joinGroupRequest.memberId) |
| .setProtocolType(joinGroupRequest.protocolType) |
| .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) |
| .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) |
| |
| val future = new CompletableFuture[JoinGroupResponseData]() |
| when(groupCoordinator.joinGroup( |
| requestChannelRequest.context, |
| expectedJoinGroupRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleJoinGroupRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val joinGroupResponse = new JoinGroupResponseData() |
| .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code) |
| .setMemberId("member") |
| .setProtocolName(null) |
| |
| val expectedJoinGroupResponse = new JoinGroupResponseData() |
| .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code) |
| .setMemberId("member") |
| .setProtocolName(if (version >= 7) null else kafka.coordinator.group.GroupCoordinator.NoProtocol) |
| |
| future.complete(joinGroupResponse) |
| val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest) |
| assertEquals(expectedJoinGroupResponse, response.data) |
| } |
| |
| @Test |
| def testHandleJoinGroupRequestFutureFailed(): Unit = { |
| val joinGroupRequest = new JoinGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setRebalanceTimeoutMs(1000) |
| .setSessionTimeoutMs(2000) |
| |
| val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build()) |
| |
| val future = new CompletableFuture[JoinGroupResponseData]() |
| when(groupCoordinator.joinGroup( |
| requestChannelRequest.context, |
| joinGroupRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleJoinGroupRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception) |
| val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest) |
| assertEquals(Errors.REQUEST_TIMED_OUT, response.error) |
| } |
| |
| @Test |
| def testHandleJoinGroupRequestAuthorizationFailed(): Unit = { |
| val joinGroupRequest = new JoinGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setRebalanceTimeoutMs(1000) |
| .setSessionTimeoutMs(2000) |
| |
| val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build()) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) |
| .thenReturn(Seq(AuthorizationResult.DENIED).asJava) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleJoinGroupRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest) |
| assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) |
| } |
| |
| @Test |
| def testHandleJoinGroupRequestUnexpectedException(): Unit = { |
| val joinGroupRequest = new JoinGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setRebalanceTimeoutMs(1000) |
| .setSessionTimeoutMs(2000) |
| |
| val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build()) |
| |
| val future = new CompletableFuture[JoinGroupResponseData]() |
| when(groupCoordinator.joinGroup( |
| requestChannelRequest.context, |
| joinGroupRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| var response: JoinGroupResponse = null |
| when(requestChannel.sendResponse(any(), any(), any())).thenAnswer { _ => |
| throw new Exception("Something went wrong") |
| }.thenAnswer { invocation => |
| response = invocation.getArgument(1, classOf[JoinGroupResponse]) |
| } |
| |
| createKafkaApis().handle( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| future.completeExceptionally(Errors.NOT_COORDINATOR.exception) |
| |
| // The exception expected here is the one thrown by `sendResponse`. As |
| // `Exception` is not a Kafka errors, `UNKNOWN_SERVER_ERROR` is returned. |
| assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) |
| def testHandleSyncGroupRequest(version: Short): Unit = { |
| val syncGroupRequest = new SyncGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setProtocolName("range") |
| |
| val requestChannelRequest = buildRequest(new SyncGroupRequest.Builder(syncGroupRequest).build(version)) |
| |
| val expectedSyncGroupRequest = new SyncGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType(if (version >= 5) "consumer" else null) |
| .setProtocolName(if (version >= 5) "range" else null) |
| |
| val future = new CompletableFuture[SyncGroupResponseData]() |
| when(groupCoordinator.syncGroup( |
| requestChannelRequest.context, |
| expectedSyncGroupRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleSyncGroupRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val expectedSyncGroupResponse = new SyncGroupResponseData() |
| .setProtocolType(if (version >= 5 ) "consumer" else null) |
| .setProtocolName(if (version >= 5 ) "range" else null) |
| |
| future.complete(expectedSyncGroupResponse) |
| val response = verifyNoThrottling[SyncGroupResponse](requestChannelRequest) |
| assertEquals(expectedSyncGroupResponse, response.data) |
| } |
| |
| @Test |
| def testHandleSyncGroupRequestFutureFailed(): Unit = { |
| val syncGroupRequest = new SyncGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setProtocolName("range") |
| |
| val requestChannelRequest = buildRequest(new SyncGroupRequest.Builder(syncGroupRequest).build()) |
| |
| val expectedSyncGroupRequest = new SyncGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setProtocolName("range") |
| |
| val future = new CompletableFuture[SyncGroupResponseData]() |
| when(groupCoordinator.syncGroup( |
| requestChannelRequest.context, |
| expectedSyncGroupRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleSyncGroupRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception) |
| val response = verifyNoThrottling[SyncGroupResponse](requestChannelRequest) |
| assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error) |
| } |
| |
| @Test |
| def testHandleSyncGroupRequestAuthenticationFailed(): Unit = { |
| val syncGroupRequest = new SyncGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setProtocolName("range") |
| |
| val requestChannelRequest = buildRequest(new SyncGroupRequest.Builder(syncGroupRequest).build()) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) |
| .thenReturn(Seq(AuthorizationResult.DENIED).asJava) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleSyncGroupRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| val response = verifyNoThrottling[SyncGroupResponse](requestChannelRequest) |
| assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) |
| def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short): Unit = { |
| val syncGroupRequest = new SyncGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| |
| val requestChannelRequest = buildRequest(new SyncGroupRequest.Builder(syncGroupRequest).build(version)) |
| |
| val expectedSyncGroupRequest = new SyncGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| |
| val future = new CompletableFuture[SyncGroupResponseData]() |
| when(groupCoordinator.syncGroup( |
| requestChannelRequest.context, |
| expectedSyncGroupRequest, |
| RequestLocal.NoCaching.bufferSupplier |
| )).thenReturn(future) |
| |
| createKafkaApis().handleSyncGroupRequest( |
| requestChannelRequest, |
| RequestLocal.NoCaching |
| ) |
| |
| if (version < 5) { |
| future.complete(new SyncGroupResponseData() |
| .setProtocolType("consumer") |
| .setProtocolName("range")) |
| } |
| |
| val response = verifyNoThrottling[SyncGroupResponse](requestChannelRequest) |
| |
| if (version < 5) { |
| assertEquals(Errors.NONE, response.error) |
| } else { |
| assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error) |
| } |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.HEARTBEAT) |
| def testHandleHeartbeatRequest(version: Short): Unit = { |
| val heartbeatRequest = new HeartbeatRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setGenerationId(0) |
| |
| val requestChannelRequest = buildRequest(new HeartbeatRequest.Builder(heartbeatRequest).build(version)) |
| |
| val expectedHeartbeatRequest = new HeartbeatRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setGenerationId(0) |
| |
| val future = new CompletableFuture[HeartbeatResponseData]() |
| when(groupCoordinator.heartbeat( |
| requestChannelRequest.context, |
| expectedHeartbeatRequest |
| )).thenReturn(future) |
| |
| createKafkaApis().handleHeartbeatRequest(requestChannelRequest) |
| |
| val expectedHeartbeatResponse = new HeartbeatResponseData() |
| future.complete(expectedHeartbeatResponse) |
| val response = verifyNoThrottling[HeartbeatResponse](requestChannelRequest) |
| assertEquals(expectedHeartbeatResponse, response.data) |
| } |
| |
| @Test |
| def testHandleHeartbeatRequestFutureFailed(): Unit = { |
| val heartbeatRequest = new HeartbeatRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setGenerationId(0) |
| |
| val requestChannelRequest = buildRequest(new HeartbeatRequest.Builder(heartbeatRequest).build()) |
| |
| val expectedHeartbeatRequest = new HeartbeatRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setGenerationId(0) |
| |
| val future = new CompletableFuture[HeartbeatResponseData]() |
| when(groupCoordinator.heartbeat( |
| requestChannelRequest.context, |
| expectedHeartbeatRequest |
| )).thenReturn(future) |
| |
| createKafkaApis().handleHeartbeatRequest(requestChannelRequest) |
| |
| future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception) |
| val response = verifyNoThrottling[HeartbeatResponse](requestChannelRequest) |
| assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error) |
| } |
| |
| @Test |
| def testHandleHeartbeatRequestAuthenticationFailed(): Unit = { |
| val heartbeatRequest = new HeartbeatRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setGenerationId(0) |
| |
| val requestChannelRequest = buildRequest(new HeartbeatRequest.Builder(heartbeatRequest).build()) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) |
| .thenReturn(Seq(AuthorizationResult.DENIED).asJava) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleHeartbeatRequest( |
| requestChannelRequest |
| ) |
| |
| val response = verifyNoThrottling[HeartbeatResponse](requestChannelRequest) |
| assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) |
| } |
| |
| @Test |
| def rejectJoinGroupRequestWhenStaticMembershipNotSupported(): Unit = { |
| val joinGroupRequest = new JoinGroupRequest.Builder( |
| new JoinGroupRequestData() |
| .setGroupId("test") |
| .setMemberId("test") |
| .setGroupInstanceId("instanceId") |
| .setProtocolType("consumer") |
| .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection) |
| ).build() |
| |
| val requestChannelRequest = buildRequest(joinGroupRequest) |
| |
| createKafkaApis(IBP_2_2_IV1).handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest) |
| assertEquals(Errors.UNSUPPORTED_VERSION, response.error()) |
| } |
| |
| @Test |
| def rejectSyncGroupRequestWhenStaticMembershipNotSupported(): Unit = { |
| val syncGroupRequest = new SyncGroupRequest.Builder( |
| new SyncGroupRequestData() |
| .setGroupId("test") |
| .setMemberId("test") |
| .setGroupInstanceId("instanceId") |
| .setGenerationId(1) |
| ).build() |
| |
| val requestChannelRequest = buildRequest(syncGroupRequest) |
| |
| createKafkaApis(IBP_2_2_IV1).handleSyncGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[SyncGroupResponse](requestChannelRequest) |
| assertEquals(Errors.UNSUPPORTED_VERSION, response.error) |
| } |
| |
| @Test |
| def rejectHeartbeatRequestWhenStaticMembershipNotSupported(): Unit = { |
| val heartbeatRequest = new HeartbeatRequest.Builder( |
| new HeartbeatRequestData() |
| .setGroupId("test") |
| .setMemberId("test") |
| .setGroupInstanceId("instanceId") |
| .setGenerationId(1) |
| ).build() |
| val requestChannelRequest = buildRequest(heartbeatRequest) |
| |
| createKafkaApis(IBP_2_2_IV1).handleHeartbeatRequest(requestChannelRequest) |
| |
| val response = verifyNoThrottling[HeartbeatResponse](requestChannelRequest) |
| assertEquals(Errors.UNSUPPORTED_VERSION, response.error()) |
| } |
| |
| @Test |
| def rejectOffsetCommitRequestWhenStaticMembershipNotSupported(): Unit = { |
| val offsetCommitRequest = new OffsetCommitRequest.Builder( |
| new OffsetCommitRequestData() |
| .setGroupId("test") |
| .setMemberId("test") |
| .setGroupInstanceId("instanceId") |
| .setGenerationId(100) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("test") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) |
| .setCommittedMetadata("") |
| )) |
| )) |
| ).build() |
| |
| val requestChannelRequest = buildRequest(offsetCommitRequest) |
| |
| createKafkaApis(IBP_2_2_IV1).handleOffsetCommitRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) |
| |
| val expectedTopicErrors = Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("test") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.UNSUPPORTED_VERSION.code) |
| )) |
| ) |
| val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) |
| assertEquals(expectedTopicErrors, response.data.topics()) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP) |
| def testHandleLeaveGroupWithMultipleMembers(version: Short): Unit = { |
| def makeRequest(version: Short): RequestChannel.Request = { |
| buildRequest(new LeaveGroupRequest.Builder( |
| "group", |
| List( |
| new MemberIdentity() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1"), |
| new MemberIdentity() |
| .setMemberId("member-2") |
| .setGroupInstanceId("instance-2") |
| ).asJava |
| ).build(version)) |
| } |
| |
| if (version < 3) { |
| // Request version earlier than version 3 do not support batching members. |
| assertThrows(classOf[UnsupportedVersionException], () => makeRequest(version)) |
| } else { |
| val requestChannelRequest = makeRequest(version) |
| |
| val expectedLeaveGroupRequest = new LeaveGroupRequestData() |
| .setGroupId("group") |
| .setMembers(List( |
| new MemberIdentity() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1"), |
| new MemberIdentity() |
| .setMemberId("member-2") |
| .setGroupInstanceId("instance-2") |
| ).asJava) |
| |
| val future = new CompletableFuture[LeaveGroupResponseData]() |
| when(groupCoordinator.leaveGroup( |
| requestChannelRequest.context, |
| expectedLeaveGroupRequest |
| )).thenReturn(future) |
| |
| createKafkaApis().handleLeaveGroupRequest(requestChannelRequest) |
| |
| val expectedLeaveResponse = new LeaveGroupResponseData() |
| .setErrorCode(Errors.NONE.code) |
| .setMembers(List( |
| new LeaveGroupResponseData.MemberResponse() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1"), |
| new LeaveGroupResponseData.MemberResponse() |
| .setMemberId("member-2") |
| .setGroupInstanceId("instance-2"), |
| ).asJava) |
| |
| future.complete(expectedLeaveResponse) |
| val response = verifyNoThrottling[LeaveGroupResponse](requestChannelRequest) |
| assertEquals(expectedLeaveResponse, response.data) |
| } |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP) |
| def testHandleLeaveGroupWithSingleMember(version: Short): Unit = { |
| val requestChannelRequest = buildRequest(new LeaveGroupRequest.Builder( |
| "group", |
| List( |
| new MemberIdentity() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1") |
| ).asJava |
| ).build(version)) |
| |
| val expectedLeaveGroupRequest = new LeaveGroupRequestData() |
| .setGroupId("group") |
| .setMembers(List( |
| new MemberIdentity() |
| .setMemberId("member-1") |
| .setGroupInstanceId(if (version >= 3) "instance-1" else null) |
| ).asJava) |
| |
| val future = new CompletableFuture[LeaveGroupResponseData]() |
| when(groupCoordinator.leaveGroup( |
| requestChannelRequest.context, |
| expectedLeaveGroupRequest |
| )).thenReturn(future) |
| |
| createKafkaApis().handleLeaveGroupRequest(requestChannelRequest) |
| |
| val leaveGroupResponse = new LeaveGroupResponseData() |
| .setErrorCode(Errors.NONE.code) |
| .setMembers(List( |
| new LeaveGroupResponseData.MemberResponse() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1") |
| ).asJava) |
| |
| val expectedLeaveResponse = if (version >= 3) { |
| new LeaveGroupResponseData() |
| .setErrorCode(Errors.NONE.code) |
| .setMembers(List( |
| new LeaveGroupResponseData.MemberResponse() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1") |
| ).asJava) |
| } else { |
| new LeaveGroupResponseData() |
| .setErrorCode(Errors.NONE.code) |
| } |
| |
| future.complete(leaveGroupResponse) |
| val response = verifyNoThrottling[LeaveGroupResponse](requestChannelRequest) |
| assertEquals(expectedLeaveResponse, response.data) |
| } |
| |
| @Test |
| def testHandleLeaveGroupFutureFailed(): Unit = { |
| val requestChannelRequest = buildRequest(new LeaveGroupRequest.Builder( |
| "group", |
| List( |
| new MemberIdentity() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1") |
| ).asJava |
| ).build(ApiKeys.LEAVE_GROUP.latestVersion)) |
| |
| val expectedLeaveGroupRequest = new LeaveGroupRequestData() |
| .setGroupId("group") |
| .setMembers(List( |
| new MemberIdentity() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1") |
| ).asJava) |
| |
| val future = new CompletableFuture[LeaveGroupResponseData]() |
| when(groupCoordinator.leaveGroup( |
| requestChannelRequest.context, |
| expectedLeaveGroupRequest |
| )).thenReturn(future) |
| |
| createKafkaApis().handleLeaveGroupRequest(requestChannelRequest) |
| |
| future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception) |
| val response = verifyNoThrottling[LeaveGroupResponse](requestChannelRequest) |
| assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error) |
| } |
| |
| @Test |
| def testHandleLeaveGroupAuthenticationFailed(): Unit = { |
| val requestChannelRequest = buildRequest(new LeaveGroupRequest.Builder( |
| "group", |
| List( |
| new MemberIdentity() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1") |
| ).asJava |
| ).build(ApiKeys.LEAVE_GROUP.latestVersion)) |
| |
| val expectedLeaveGroupRequest = new LeaveGroupRequestData() |
| .setGroupId("group") |
| .setMembers(List( |
| new MemberIdentity() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1") |
| ).asJava) |
| |
| val future = new CompletableFuture[LeaveGroupResponseData]() |
| when(groupCoordinator.leaveGroup( |
| requestChannelRequest.context, |
| expectedLeaveGroupRequest |
| )).thenReturn(future) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) |
| .thenReturn(Seq(AuthorizationResult.DENIED).asJava) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleLeaveGroupRequest(requestChannelRequest) |
| |
| val response = verifyNoThrottling[LeaveGroupResponse](requestChannelRequest) |
| assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) |
| def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = { |
| // Version 0 gets offsets from Zookeeper. We are not interested |
| // in testing this here. |
| if (version == 0) return |
| |
| def makeRequest(version: Short): RequestChannel.Request = { |
| val groups = Map( |
| "group-1" -> List( |
| new TopicPartition("foo", 0), |
| new TopicPartition("foo", 1) |
| ).asJava, |
| "group-2" -> null, |
| "group-3" -> null, |
| ).asJava |
| buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) |
| } |
| |
| if (version < 8) { |
| // Request version earlier than version 8 do not support batching groups. |
| assertThrows(classOf[UnsupportedVersionException], () => makeRequest(version)) |
| } else { |
| val requestChannelRequest = makeRequest(version) |
| |
| val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() |
| when(groupCoordinator.fetchOffsets( |
| requestChannelRequest.context, |
| "group-1", |
| List(new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(List[Integer](0, 1).asJava) |
| ).asJava, |
| false |
| )).thenReturn(group1Future) |
| |
| val group2Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() |
| when(groupCoordinator.fetchAllOffsets( |
| requestChannelRequest.context, |
| "group-2", |
| false |
| )).thenReturn(group2Future) |
| |
| val group3Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() |
| when(groupCoordinator.fetchAllOffsets( |
| requestChannelRequest.context, |
| "group-3", |
| false |
| )).thenReturn(group3Future) |
| |
| createKafkaApis().handleOffsetFetchRequest(requestChannelRequest) |
| |
| val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-1") |
| .setTopics(List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1), |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(1) |
| .setCommittedOffset(200) |
| .setCommittedLeaderEpoch(2) |
| ).asJava) |
| ).asJava) |
| |
| val group2Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-2") |
| .setTopics(List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1), |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(1) |
| .setCommittedOffset(200) |
| .setCommittedLeaderEpoch(2), |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(2) |
| .setCommittedOffset(300) |
| .setCommittedLeaderEpoch(3) |
| ).asJava) |
| ).asJava) |
| |
| val group3Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-3") |
| .setErrorCode(Errors.INVALID_GROUP_ID.code) |
| |
| val expectedOffsetFetchResponse = new OffsetFetchResponseData() |
| .setGroups(List(group1Response, group2Response, group3Response).asJava) |
| |
| group1Future.complete(group1Response.topics) |
| group2Future.complete(group2Response.topics) |
| group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception) |
| |
| val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) |
| assertEquals(expectedOffsetFetchResponse, response.data) |
| } |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) |
| def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = { |
| // Version 0 gets offsets from Zookeeper. We are not interested |
| // in testing this here. |
| if (version == 0) return |
| |
| def makeRequest(version: Short): RequestChannel.Request = { |
| buildRequest(new OffsetFetchRequest.Builder( |
| "group-1", |
| false, |
| List( |
| new TopicPartition("foo", 0), |
| new TopicPartition("foo", 1) |
| ).asJava, |
| false |
| ).build(version)) |
| } |
| |
| val requestChannelRequest = makeRequest(version) |
| |
| val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() |
| when(groupCoordinator.fetchOffsets( |
| requestChannelRequest.context, |
| "group-1", |
| List(new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(List[Integer](0, 1).asJava) |
| ).asJava, |
| false |
| )).thenReturn(future) |
| |
| createKafkaApis().handleOffsetFetchRequest(requestChannelRequest) |
| |
| val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-1") |
| .setTopics(List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1), |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(1) |
| .setCommittedOffset(200) |
| .setCommittedLeaderEpoch(2) |
| ).asJava) |
| ).asJava) |
| |
| val expectedOffsetFetchResponse = if (version >= 8) { |
| new OffsetFetchResponseData() |
| .setGroups(List(group1Response).asJava) |
| } else { |
| new OffsetFetchResponseData() |
| .setTopics(List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(if (version >= 5) 1 else -1), |
| new OffsetFetchResponseData.OffsetFetchResponsePartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(200) |
| .setCommittedLeaderEpoch(if (version >= 5) 2 else -1) |
| ).asJava) |
| ).asJava) |
| } |
| |
| future.complete(group1Response.topics) |
| |
| val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) |
| assertEquals(expectedOffsetFetchResponse, response.data) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) |
| def testHandleOffsetFetchAllOffsetsWithSingleGroup(version: Short): Unit = { |
| // Version 0 gets offsets from Zookeeper. Version 1 does not support fetching all |
| // offsets request. We are not interested in testing these here. |
| if (version < 2) return |
| |
| def makeRequest(version: Short): RequestChannel.Request = { |
| buildRequest(new OffsetFetchRequest.Builder( |
| "group-1", |
| false, |
| null, // all offsets. |
| false |
| ).build(version)) |
| } |
| |
| val requestChannelRequest = makeRequest(version) |
| |
| val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() |
| when(groupCoordinator.fetchAllOffsets( |
| requestChannelRequest.context, |
| "group-1", |
| false |
| )).thenReturn(future) |
| |
| createKafkaApis().handleOffsetFetchRequest(requestChannelRequest) |
| |
| val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-1") |
| .setTopics(List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1), |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(1) |
| .setCommittedOffset(200) |
| .setCommittedLeaderEpoch(2) |
| ).asJava) |
| ).asJava) |
| |
| val expectedOffsetFetchResponse = if (version >= 8) { |
| new OffsetFetchResponseData() |
| .setGroups(List(group1Response).asJava) |
| } else { |
| new OffsetFetchResponseData() |
| .setTopics(List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(if (version >= 5) 1 else -1), |
| new OffsetFetchResponseData.OffsetFetchResponsePartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(200) |
| .setCommittedLeaderEpoch(if (version >= 5) 2 else -1) |
| ).asJava) |
| ).asJava) |
| } |
| |
| future.complete(group1Response.topics) |
| |
| val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) |
| assertEquals(expectedOffsetFetchResponse, response.data) |
| } |
| |
| @Test |
| def testHandleOffsetFetchAuthorization(): Unit = { |
| def makeRequest(version: Short): RequestChannel.Request = { |
| val groups = Map( |
| "group-1" -> List( |
| new TopicPartition("foo", 0), |
| new TopicPartition("bar", 0) |
| ).asJava, |
| "group-2" -> List( |
| new TopicPartition("foo", 0), |
| new TopicPartition("bar", 0) |
| ).asJava, |
| "group-3" -> null, |
| "group-4" -> null, |
| ).asJava |
| buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) |
| } |
| |
| val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| val acls = Map( |
| "group-1" -> AuthorizationResult.ALLOWED, |
| "group-2" -> AuthorizationResult.DENIED, |
| "group-3" -> AuthorizationResult.ALLOWED, |
| "group-4" -> AuthorizationResult.DENIED, |
| "foo" -> AuthorizationResult.DENIED, |
| "bar" -> AuthorizationResult.ALLOWED |
| ) |
| |
| when(authorizer.authorize( |
| any[RequestContext], |
| any[util.List[Action]] |
| )).thenAnswer { invocation => |
| val actions = invocation.getArgument(1, classOf[util.List[Action]]) |
| actions.asScala.map { action => |
| acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) |
| }.asJava |
| } |
| |
| // group-1 is allowed and bar is allowed. |
| val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() |
| when(groupCoordinator.fetchOffsets( |
| requestChannelRequest.context, |
| "group-1", |
| List(new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("bar") |
| .setPartitionIndexes(List[Integer](0).asJava) |
| ).asJava, |
| false |
| )).thenReturn(group1Future) |
| |
| // group-3 is allowed and bar is allowed. |
| val group3Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() |
| when(groupCoordinator.fetchAllOffsets( |
| requestChannelRequest.context, |
| "group-3", |
| false |
| )).thenReturn(group3Future) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handle(requestChannelRequest, RequestLocal.NoCaching) |
| |
| val group1ResponseFromCoordinator = new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-1") |
| .setTopics(List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1) |
| ).asJava) |
| ).asJava) |
| |
| val group3ResponseFromCoordinator = new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-3") |
| .setTopics(List( |
| // foo should be filtered out. |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1) |
| ).asJava), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1) |
| ).asJava) |
| ).asJava) |
| |
| val expectedOffsetFetchResponse = new OffsetFetchResponseData() |
| .setGroups(List( |
| // group-1 is authorized but foo is not. |
| new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-1") |
| .setTopics(List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1) |
| ).asJava), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) |
| .setCommittedOffset(-1) |
| ).asJava) |
| ).asJava), |
| // group-2 is not authorized. |
| new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code), |
| // group-3 is authorized but foo is not. |
| new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-3") |
| .setTopics(List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1) |
| ).asJava) |
| ).asJava), |
| // group-4 is not authorized. |
| new OffsetFetchResponseData.OffsetFetchResponseGroup() |
| .setGroupId("group-4") |
| .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code), |
| ).asJava) |
| |
| group1Future.complete(group1ResponseFromCoordinator.topics) |
| group3Future.complete(group3ResponseFromCoordinator.topics) |
| |
| val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) |
| assertEquals(expectedOffsetFetchResponse, response.data) |
| } |
| |
| @Test |
| def testReassignmentAndReplicationBytesOutRateWhenReassigning(): Unit = { |
| assertReassignmentAndReplicationBytesOutPerSec(true) |
| } |
| |
| @Test |
| def testReassignmentAndReplicationBytesOutRateWhenNotReassigning(): Unit = { |
| assertReassignmentAndReplicationBytesOutPerSec(false) |
| } |
| |
| private def assertReassignmentAndReplicationBytesOutPerSec(isReassigning: Boolean): Unit = { |
| val leaderEpoch = 0 |
| val tp0 = new TopicPartition("tp", 0) |
| val topicId = Uuid.randomUuid() |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| |
| setupBasicMetadataCache(tp0.topic, numPartitions = 1, 1, topicId) |
| val hw = 3 |
| |
| val fetchDataBuilder = Collections.singletonMap(tp0, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue, Optional.of(leaderEpoch))) |
| val fetchData = Collections.singletonMap(tidp0, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue, Optional.of(leaderEpoch))) |
| val fetchFromFollower = buildRequest(new FetchRequest.Builder( |
| ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1, 1000, 0, fetchDataBuilder).build()) |
| |
| val records = MemoryRecords.withRecords(CompressionType.NONE, |
| new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))) |
| when(replicaManager.fetchMessages( |
| any[FetchParams], |
| any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], |
| any[ReplicaQuota], |
| any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]() |
| )).thenAnswer(invocation => { |
| val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] |
| callback(Seq(tidp0 -> new FetchPartitionData(Errors.NONE, hw, 0, records, |
| Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), isReassigning))) |
| }) |
| |
| val fetchMetadata = new JFetchMetadata(0, 0) |
| val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100), |
| fetchMetadata, fetchData, true, true) |
| when(fetchManager.newContext( |
| any[Short], |
| any[JFetchMetadata], |
| any[Boolean], |
| any[util.Map[TopicIdPartition, FetchRequest.PartitionData]], |
| any[util.List[TopicIdPartition]], |
| any[util.Map[Uuid, String]])).thenReturn(fetchContext) |
| |
| when(replicaManager.getLogConfig(ArgumentMatchers.eq(tp0))).thenReturn(None) |
| when(replicaManager.isAddingReplica(any(), anyInt)).thenReturn(isReassigning) |
| |
| createKafkaApis().handle(fetchFromFollower, RequestLocal.withThreadConfinedCaching) |
| verify(replicaQuotaManager).record(anyLong) |
| |
| if (isReassigning) |
| assertEquals(records.sizeInBytes(), brokerTopicStats.allTopicsStats.reassignmentBytesOutPerSec.get.count()) |
| else |
| assertEquals(0, brokerTopicStats.allTopicsStats.reassignmentBytesOutPerSec.get.count()) |
| assertEquals(records.sizeInBytes(), brokerTopicStats.allTopicsStats.replicationBytesOutRate.get.count()) |
| } |
| |
| @Test |
| def rejectInitProducerIdWhenIdButNotEpochProvided(): Unit = { |
| val initProducerIdRequest = new InitProducerIdRequest.Builder( |
| new InitProducerIdRequestData() |
| .setTransactionalId("known") |
| .setTransactionTimeoutMs(TimeUnit.MINUTES.toMillis(15).toInt) |
| .setProducerId(10) |
| .setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH) |
| ).build() |
| |
| val requestChannelRequest = buildRequest(initProducerIdRequest) |
| |
| createKafkaApis(IBP_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[InitProducerIdResponse](requestChannelRequest) |
| assertEquals(Errors.INVALID_REQUEST, response.error) |
| } |
| |
| @Test |
| def rejectInitProducerIdWhenEpochButNotIdProvided(): Unit = { |
| val initProducerIdRequest = new InitProducerIdRequest.Builder( |
| new InitProducerIdRequestData() |
| .setTransactionalId("known") |
| .setTransactionTimeoutMs(TimeUnit.MINUTES.toMillis(15).toInt) |
| .setProducerId(RecordBatch.NO_PRODUCER_ID) |
| .setProducerEpoch(2) |
| ).build() |
| val requestChannelRequest = buildRequest(initProducerIdRequest) |
| |
| createKafkaApis(IBP_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) |
| |
| val response = verifyNoThrottling[InitProducerIdResponse](requestChannelRequest) |
| assertEquals(Errors.INVALID_REQUEST, response.error) |
| } |
| |
| @Test |
| def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = { |
| val currentBrokerEpoch = 1239875L |
| testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) |
| } |
| |
| @Test |
| def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = { |
| val currentBrokerEpoch = 1239875L |
| testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) |
| } |
| |
| @Test |
| def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = { |
| val currentBrokerEpoch = 1239875L |
| testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) |
| } |
| |
| def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { |
| val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest, 1) |
| val request = buildRequest(updateMetadataRequest) |
| |
| val capturedResponse: ArgumentCaptor[UpdateMetadataResponse] = ArgumentCaptor.forClass(classOf[UpdateMetadataResponse]) |
| |
| when(controller.brokerEpoch).thenReturn(currentBrokerEpoch) |
| when(replicaManager.maybeUpdateMetadataCache( |
| ArgumentMatchers.eq(request.context.correlationId), |
| any() |
| )).thenReturn( |
| Seq() |
| ) |
| |
| createKafkaApis().handleUpdateMetadataRequest(request, RequestLocal.withThreadConfinedCaching) |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val updateMetadataResponse = capturedResponse.getValue |
| assertEquals(expectedError, updateMetadataResponse.error()) |
| if (expectedError == Errors.NONE) { |
| verify(replicaManager).maybeUpdateMetadataCache( |
| ArgumentMatchers.eq(request.context.correlationId), |
| any() |
| ) |
| } |
| } |
| |
| @Test |
| def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = { |
| val currentBrokerEpoch = 1239875L |
| testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) |
| } |
| |
| @Test |
| def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = { |
| val currentBrokerEpoch = 1239875L |
| testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) |
| } |
| |
| @Test |
| def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = { |
| val currentBrokerEpoch = 1239875L |
| testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) |
| } |
| |
| def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { |
| val controllerId = 2 |
| val controllerEpoch = 6 |
| val capturedResponse: ArgumentCaptor[LeaderAndIsrResponse] = ArgumentCaptor.forClass(classOf[LeaderAndIsrResponse]) |
| val partitionStates = Seq( |
| new LeaderAndIsrRequestData.LeaderAndIsrPartitionState() |
| .setTopicName("topicW") |
| .setPartitionIndex(1) |
| .setControllerEpoch(1) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setIsr(asList(0, 1)) |
| .setPartitionEpoch(2) |
| .setReplicas(asList(0, 1, 2)) |
| .setIsNew(false) |
| ).asJava |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder( |
| ApiKeys.LEADER_AND_ISR.latestVersion, |
| controllerId, |
| controllerEpoch, |
| brokerEpochInRequest, |
| partitionStates, |
| Collections.singletonMap("topicW", Uuid.randomUuid()), |
| asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091)) |
| ).build() |
| val request = buildRequest(leaderAndIsrRequest) |
| val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() |
| .setErrorCode(Errors.NONE.code) |
| .setPartitionErrors(asList()), leaderAndIsrRequest.version()) |
| |
| when(controller.brokerEpoch).thenReturn(currentBrokerEpoch) |
| when(replicaManager.becomeLeaderOrFollower( |
| ArgumentMatchers.eq(request.context.correlationId), |
| any(), |
| any() |
| )).thenReturn( |
| response |
| ) |
| |
| createKafkaApis().handleLeaderAndIsrRequest(request) |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val leaderAndIsrResponse = capturedResponse.getValue |
| assertEquals(expectedError, leaderAndIsrResponse.error()) |
| } |
| |
| @Test |
| def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = { |
| val currentBrokerEpoch = 1239875L |
| testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) |
| } |
| |
| @Test |
| def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = { |
| val currentBrokerEpoch = 1239875L |
| testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) |
| } |
| |
| @Test |
| def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = { |
| val currentBrokerEpoch = 1239875L |
| testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) |
| } |
| |
| def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { |
| val controllerId = 0 |
| val controllerEpoch = 5 |
| val capturedResponse: ArgumentCaptor[StopReplicaResponse] = ArgumentCaptor.forClass(classOf[StopReplicaResponse]) |
| val fooPartition = new TopicPartition("foo", 0) |
| val topicStates = Seq( |
| new StopReplicaTopicState() |
| .setTopicName(fooPartition.topic) |
| .setPartitionStates(Seq(new StopReplicaPartitionState() |
| .setPartitionIndex(fooPartition.partition) |
| .setLeaderEpoch(1) |
| .setDeletePartition(false)).asJava) |
| ).asJava |
| val stopReplicaRequest = new StopReplicaRequest.Builder( |
| ApiKeys.STOP_REPLICA.latestVersion, |
| controllerId, |
| controllerEpoch, |
| brokerEpochInRequest, |
| false, |
| topicStates |
| ).build() |
| val request = buildRequest(stopReplicaRequest) |
| |
| when(controller.brokerEpoch).thenReturn(currentBrokerEpoch) |
| when(replicaManager.stopReplicas( |
| ArgumentMatchers.eq(request.context.correlationId), |
| ArgumentMatchers.eq(controllerId), |
| ArgumentMatchers.eq(controllerEpoch), |
| ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala) |
| )).thenReturn( |
| (mutable.Map( |
| fooPartition -> Errors.NONE |
| ), Errors.NONE) |
| ) |
| |
| createKafkaApis().handleStopReplicaRequest(request) |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| ArgumentMatchers.eq(None) |
| ) |
| val stopReplicaResponse = capturedResponse.getValue |
| assertEquals(expectedError, stopReplicaResponse.error()) |
| if (expectedError != Errors.STALE_BROKER_EPOCH) { |
| verify(replicaManager).stopReplicas( |
| ArgumentMatchers.eq(request.context.correlationId), |
| ArgumentMatchers.eq(controllerId), |
| ArgumentMatchers.eq(controllerEpoch), |
| ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala) |
| ) |
| } |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.LIST_GROUPS) |
| def testListGroupsRequest(version: Short): Unit = { |
| val listGroupsRequest = new ListGroupsRequestData() |
| .setStatesFilter(if (version >= 4) List("Stable", "Empty").asJava else List.empty.asJava) |
| |
| val requestChannelRequest = buildRequest(new ListGroupsRequest.Builder(listGroupsRequest).build(version)) |
| |
| val expectedListGroupsRequest = new ListGroupsRequestData() |
| .setStatesFilter(if (version >= 4) List("Stable", "Empty").asJava else List.empty.asJava) |
| |
| val future = new CompletableFuture[ListGroupsResponseData]() |
| when(groupCoordinator.listGroups( |
| requestChannelRequest.context, |
| expectedListGroupsRequest |
| )).thenReturn(future) |
| |
| createKafkaApis().handleListGroupsRequest(requestChannelRequest) |
| |
| val expectedListGroupsResponse = new ListGroupsResponseData() |
| .setGroups(List( |
| new ListGroupsResponseData.ListedGroup() |
| .setGroupId("group1") |
| .setGroupState(if (version >= 4) "Stable" else "") |
| .setProtocolType("protocol1"), |
| new ListGroupsResponseData.ListedGroup() |
| .setGroupId("group2") |
| .setGroupState(if (version >= 4) "Empty" else "") |
| .setProtocolType("qwerty") |
| ).asJava) |
| |
| future.complete(expectedListGroupsResponse) |
| val response = verifyNoThrottling[ListGroupsResponse](requestChannelRequest) |
| assertEquals(expectedListGroupsResponse, response.data) |
| } |
| |
| @Test |
| def testListGroupsRequestFutureFailed(): Unit = { |
| val listGroupsRequest = new ListGroupsRequestData() |
| .setStatesFilter(List("Stable", "Empty").asJava) |
| |
| val requestChannelRequest = buildRequest(new ListGroupsRequest.Builder(listGroupsRequest).build()) |
| |
| val expectedListGroupsRequest = new ListGroupsRequestData() |
| .setStatesFilter(List("Stable", "Empty").asJava) |
| |
| val future = new CompletableFuture[ListGroupsResponseData]() |
| when(groupCoordinator.listGroups( |
| requestChannelRequest.context, |
| expectedListGroupsRequest |
| )).thenReturn(future) |
| |
| createKafkaApis().handleListGroupsRequest(requestChannelRequest) |
| |
| future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception) |
| val response = verifyNoThrottling[ListGroupsResponse](requestChannelRequest) |
| assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, response.data.errorCode) |
| } |
| |
| @Test |
| def testListGroupsRequestFiltersUnauthorizedGroupsWithDescribeCluster(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| authorizeResource( |
| authorizer, |
| AclOperation.DESCRIBE, |
| ResourceType.GROUP, |
| "group1", |
| AuthorizationResult.DENIED, |
| logIfDenied = false |
| ) |
| authorizeResource( |
| authorizer, |
| AclOperation.DESCRIBE, |
| ResourceType.GROUP, |
| "group2", |
| AuthorizationResult.DENIED, |
| logIfDenied = false |
| ) |
| authorizeResource( |
| authorizer, |
| AclOperation.DESCRIBE, |
| ResourceType.CLUSTER, |
| Resource.CLUSTER_NAME, |
| AuthorizationResult.ALLOWED, |
| logIfDenied = false |
| ) |
| |
| testListGroupsRequestFiltersUnauthorizedGroups( |
| authorizer, |
| List("group1", "group2"), |
| List("group1", "group2") |
| ) |
| } |
| |
| @Test |
| def testListGroupsRequestFiltersUnauthorizedGroupsWithDescribeGroups(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| authorizeResource( |
| authorizer, |
| AclOperation.DESCRIBE, |
| ResourceType.GROUP, |
| "group1", |
| AuthorizationResult.DENIED, |
| logIfDenied = false |
| ) |
| authorizeResource( |
| authorizer, |
| AclOperation.DESCRIBE, |
| ResourceType.GROUP, |
| "group2", |
| AuthorizationResult.ALLOWED, |
| logIfDenied = false |
| ) |
| authorizeResource( |
| authorizer, |
| AclOperation.DESCRIBE, |
| ResourceType.CLUSTER, |
| Resource.CLUSTER_NAME, |
| AuthorizationResult.DENIED, |
| logIfDenied = false |
| ) |
| |
| testListGroupsRequestFiltersUnauthorizedGroups( |
| authorizer, |
| List("group1", "group2"), |
| List("group2") |
| ) |
| } |
| |
| def testListGroupsRequestFiltersUnauthorizedGroups( |
| authorizer: Authorizer, |
| groups: List[String], |
| expectedGroups: List[String], |
| ): Unit = { |
| val listGroupsRequest = new ListGroupsRequestData() |
| |
| val requestChannelRequest = buildRequest(new ListGroupsRequest.Builder(listGroupsRequest).build()) |
| |
| val expectedListGroupsRequest = new ListGroupsRequestData() |
| |
| val future = new CompletableFuture[ListGroupsResponseData]() |
| when(groupCoordinator.listGroups( |
| requestChannelRequest.context, |
| expectedListGroupsRequest |
| )).thenReturn(future) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleListGroupsRequest(requestChannelRequest) |
| |
| val listGroupsResponse = new ListGroupsResponseData() |
| groups.foreach { groupId => |
| listGroupsResponse.groups.add(new ListGroupsResponseData.ListedGroup() |
| .setGroupId(groupId) |
| ) |
| } |
| |
| val expectedListGroupsResponse = new ListGroupsResponseData() |
| expectedGroups.foreach { groupId => |
| expectedListGroupsResponse.groups.add(new ListGroupsResponseData.ListedGroup() |
| .setGroupId(groupId) |
| ) |
| } |
| |
| future.complete(listGroupsResponse) |
| val response = verifyNoThrottling[ListGroupsResponse](requestChannelRequest) |
| assertEquals(expectedListGroupsResponse, response.data) |
| } |
| |
| @Test |
| def testDescribeClusterRequest(): Unit = { |
| val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) |
| val brokers = Seq( |
| new UpdateMetadataBroker() |
| .setId(0) |
| .setRack("rack") |
| .setEndpoints(Seq( |
| new UpdateMetadataEndpoint() |
| .setHost("broker0") |
| .setPort(9092) |
| .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) |
| .setListener(plaintextListener.value) |
| ).asJava), |
| new UpdateMetadataBroker() |
| .setId(1) |
| .setRack("rack") |
| .setEndpoints(Seq( |
| new UpdateMetadataEndpoint() |
| .setHost("broker1") |
| .setPort(9092) |
| .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) |
| .setListener(plaintextListener.value)).asJava) |
| ) |
| val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, |
| 0, 0, Seq.empty[UpdateMetadataPartitionState].asJava, brokers.asJava, Collections.emptyMap()).build() |
| MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest) |
| |
| val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData() |
| .setIncludeClusterAuthorizedOperations(true)).build() |
| |
| val request = buildRequest(describeClusterRequest, plaintextListener) |
| |
| createKafkaApis().handleDescribeCluster(request) |
| |
| val describeClusterResponse = verifyNoThrottling[DescribeClusterResponse](request) |
| |
| assertEquals(metadataCache.getControllerId.get.id, describeClusterResponse.data.controllerId) |
| assertEquals(clusterId, describeClusterResponse.data.clusterId) |
| assertEquals(8096, describeClusterResponse.data.clusterAuthorizedOperations) |
| assertEquals(metadataCache.getAliveBrokerNodes(plaintextListener).toSet, |
| describeClusterResponse.nodes.asScala.values.toSet) |
| } |
| |
| /** |
| * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively. |
| */ |
| private def updateMetadataCacheWithInconsistentListeners(): (ListenerName, ListenerName) = { |
| val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) |
| val anotherListener = new ListenerName("LISTENER2") |
| val brokers = Seq( |
| new UpdateMetadataBroker() |
| .setId(0) |
| .setRack("rack") |
| .setEndpoints(Seq( |
| new UpdateMetadataEndpoint() |
| .setHost("broker0") |
| .setPort(9092) |
| .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) |
| .setListener(plaintextListener.value), |
| new UpdateMetadataEndpoint() |
| .setHost("broker0") |
| .setPort(9093) |
| .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) |
| .setListener(anotherListener.value) |
| ).asJava), |
| new UpdateMetadataBroker() |
| .setId(1) |
| .setRack("rack") |
| .setEndpoints(Seq( |
| new UpdateMetadataEndpoint() |
| .setHost("broker1") |
| .setPort(9092) |
| .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) |
| .setListener(plaintextListener.value)).asJava) |
| ) |
| val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, |
| 0, 0, Seq.empty[UpdateMetadataPartitionState].asJava, brokers.asJava, Collections.emptyMap()).build() |
| MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest) |
| (plaintextListener, anotherListener) |
| } |
| |
| private def sendMetadataRequestWithInconsistentListeners(requestListener: ListenerName): MetadataResponse = { |
| val metadataRequest = MetadataRequest.Builder.allTopics.build() |
| val requestChannelRequest = buildRequest(metadataRequest, requestListener) |
| |
| createKafkaApis().handleTopicMetadataRequest(requestChannelRequest) |
| |
| verifyNoThrottling[MetadataResponse](requestChannelRequest) |
| } |
| |
| private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel): Unit = { |
| val tp = new TopicPartition("foo", 0) |
| val latestOffset = 15L |
| val currentLeaderEpoch = Optional.empty[Integer]() |
| |
| when(replicaManager.fetchOffsetForTimestamp( |
| ArgumentMatchers.eq(tp), |
| ArgumentMatchers.eq(ListOffsetsRequest.LATEST_TIMESTAMP), |
| ArgumentMatchers.eq(Some(isolationLevel)), |
| ArgumentMatchers.eq(currentLeaderEpoch), |
| fetchOnlyFromLeader = ArgumentMatchers.eq(true)) |
| ).thenReturn(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, latestOffset, currentLeaderEpoch))) |
| |
| val targetTimes = List(new ListOffsetsTopic() |
| .setName(tp.topic) |
| .setPartitions(List(new ListOffsetsPartition() |
| .setPartitionIndex(tp.partition) |
| .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava |
| val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false) |
| .setTargetTimes(targetTimes).build() |
| val request = buildRequest(listOffsetRequest) |
| |
| createKafkaApis().handleListOffsetRequest(request) |
| |
| val response = verifyNoThrottling[ListOffsetsResponse](request) |
| val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get |
| .partitions.asScala.find(_.partitionIndex == tp.partition) |
| assertTrue(partitionDataOptional.isDefined) |
| |
| val partitionData = partitionDataOptional.get |
| assertEquals(Errors.NONE.code, partitionData.errorCode) |
| assertEquals(latestOffset, partitionData.offset) |
| assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp) |
| } |
| |
| private def createWriteTxnMarkersRequest(partitions: util.List[TopicPartition]) = { |
| val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), |
| asList(new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, partitions))).build() |
| (writeTxnMarkersRequest, buildRequest(writeTxnMarkersRequest)) |
| } |
| |
| private def buildRequest(request: AbstractRequest, |
| listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), |
| fromPrivilegedListener: Boolean = false, |
| requestHeader: Option[RequestHeader] = None): RequestChannel.Request = { |
| val buffer = request.serializeWithHeader( |
| requestHeader.getOrElse(new RequestHeader(request.apiKey, request.version, clientId, 0))) |
| |
| // read the header from the buffer first so that the body can be read next from the Request constructor |
| val header = RequestHeader.parse(buffer) |
| val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, |
| listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, fromPrivilegedListener, |
| Optional.of(kafkaPrincipalSerde)) |
| new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer, |
| requestChannelMetrics, envelope = None) |
| } |
| |
| private def verifyNoThrottling[T <: AbstractResponse]( |
| request: RequestChannel.Request |
| ): T = { |
| val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse]) |
| verify(requestChannel).sendResponse( |
| ArgumentMatchers.eq(request), |
| capturedResponse.capture(), |
| any() |
| ) |
| val response = capturedResponse.getValue |
| val buffer = MessageUtil.toByteBuffer( |
| response.data, |
| request.context.header.apiVersion |
| ) |
| AbstractResponse.parseResponse( |
| request.context.header.apiKey, |
| buffer, |
| request.context.header.apiVersion, |
| ).asInstanceOf[T] |
| } |
| |
| private def createBasicMetadataRequest(topic: String, |
| numPartitions: Int, |
| brokerEpoch: Long, |
| numBrokers: Int, |
| topicId: Uuid = Uuid.ZERO_UUID): UpdateMetadataRequest = { |
| val replicas = List(0.asInstanceOf[Integer]).asJava |
| |
| def createPartitionState(partition: Int) = new UpdateMetadataPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(partition) |
| .setControllerEpoch(1) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setReplicas(replicas) |
| .setZkVersion(0) |
| .setIsr(replicas) |
| |
| val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) |
| val partitionStates = (0 until numPartitions).map(createPartitionState) |
| val liveBrokers = (0 until numBrokers).map( |
| brokerId => createMetadataBroker(brokerId, plaintextListener)) |
| new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, |
| 0, brokerEpoch, partitionStates.asJava, liveBrokers.asJava, Collections.singletonMap(topic, topicId)).build() |
| } |
| |
| private def setupBasicMetadataCache(topic: String, numPartitions: Int, numBrokers: Int, topicId: Uuid): Unit = { |
| val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0, numBrokers, topicId) |
| MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest) |
| } |
| |
| private def addTopicToMetadataCache(topic: String, numPartitions: Int, numBrokers: Int = 1, topicId: Uuid = Uuid.ZERO_UUID): Unit = { |
| val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0, numBrokers, topicId) |
| MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest) |
| } |
| |
| private def createMetadataBroker(brokerId: Int, |
| listener: ListenerName): UpdateMetadataBroker = { |
| new UpdateMetadataBroker() |
| .setId(brokerId) |
| .setRack("rack") |
| .setEndpoints(Seq(new UpdateMetadataEndpoint() |
| .setHost("broker" + brokerId) |
| .setPort(9092) |
| .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) |
| .setListener(listener.value)).asJava) |
| } |
| |
| @Test |
| def testAlterReplicaLogDirs(): Unit = { |
| val data = new AlterReplicaLogDirsRequestData() |
| val dir = new AlterReplicaLogDirsRequestData.AlterReplicaLogDir() |
| .setPath("/foo") |
| dir.topics().add(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic().setName("t0").setPartitions(asList(0, 1, 2))) |
| data.dirs().add(dir) |
| val alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder( |
| data |
| ).build() |
| val request = buildRequest(alterReplicaLogDirsRequest) |
| |
| reset(replicaManager, clientRequestQuotaManager, requestChannel) |
| |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| val t0p0 = new TopicPartition("t0", 0) |
| val t0p1 = new TopicPartition("t0", 1) |
| val t0p2 = new TopicPartition("t0", 2) |
| val partitionResults = Map( |
| t0p0 -> Errors.NONE, |
| t0p1 -> Errors.LOG_DIR_NOT_FOUND, |
| t0p2 -> Errors.INVALID_TOPIC_EXCEPTION) |
| when(replicaManager.alterReplicaLogDirs(ArgumentMatchers.eq(Map( |
| t0p0 -> "/foo", |
| t0p1 -> "/foo", |
| t0p2 -> "/foo")))) |
| .thenReturn(partitionResults) |
| |
| createKafkaApis().handleAlterReplicaLogDirsRequest(request) |
| |
| val response = verifyNoThrottling[AlterReplicaLogDirsResponse](request) |
| assertEquals(partitionResults, response.data.results.asScala.flatMap { tr => |
| tr.partitions().asScala.map { pr => |
| new TopicPartition(tr.topicName, pr.partitionIndex) -> Errors.forCode(pr.errorCode) |
| } |
| }.toMap) |
| assertEquals(Map(Errors.NONE -> 1, |
| Errors.LOG_DIR_NOT_FOUND -> 1, |
| Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts) |
| } |
| |
| @Test |
| def testSizeOfThrottledPartitions(): Unit = { |
| val topicNames = new util.HashMap[Uuid, String] |
| val topicIds = new util.HashMap[String, Uuid]() |
| def fetchResponse(data: Map[TopicIdPartition, String]): FetchResponse = { |
| val responseData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]( |
| data.map { case (tp, raw) => |
| tp -> new FetchResponseData.PartitionData() |
| .setPartitionIndex(tp.topicPartition.partition) |
| .setHighWatermark(105) |
| .setLastStableOffset(105) |
| .setLogStartOffset(0) |
| .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8)))) |
| }.toMap.asJava) |
| |
| data.foreach{case (tp, _) => |
| topicIds.put(tp.topicPartition.topic, tp.topicId) |
| topicNames.put(tp.topicId, tp.topicPartition.topic) |
| } |
| FetchResponse.of(Errors.NONE, 100, 100, responseData) |
| } |
| |
| val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("throttledData", 0)) |
| val throttledData = Map(throttledPartition -> "throttledData") |
| val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION, |
| fetchResponse(throttledData).responseData(topicNames, FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.asScala.map( entry => |
| (new TopicIdPartition(Uuid.ZERO_UUID, entry.getKey), entry.getValue)).toMap.asJava.entrySet.iterator) |
| |
| val response = fetchResponse(throttledData ++ Map(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("nonThrottledData", 0)) -> "nonThrottledData")) |
| |
| val quota = Mockito.mock(classOf[ReplicationQuotaManager]) |
| Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition]))) |
| .thenAnswer(invocation => throttledPartition.topicPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) |
| |
| assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota)) |
| } |
| |
| @Test |
| def testDescribeProducers(): Unit = { |
| val tp1 = new TopicPartition("foo", 0) |
| val tp2 = new TopicPartition("bar", 3) |
| val tp3 = new TopicPartition("baz", 1) |
| val tp4 = new TopicPartition("invalid;topic", 1) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| val data = new DescribeProducersRequestData().setTopics(List( |
| new DescribeProducersRequestData.TopicRequest() |
| .setName(tp1.topic) |
| .setPartitionIndexes(List(Int.box(tp1.partition)).asJava), |
| new DescribeProducersRequestData.TopicRequest() |
| .setName(tp2.topic) |
| .setPartitionIndexes(List(Int.box(tp2.partition)).asJava), |
| new DescribeProducersRequestData.TopicRequest() |
| .setName(tp3.topic) |
| .setPartitionIndexes(List(Int.box(tp3.partition)).asJava), |
| new DescribeProducersRequestData.TopicRequest() |
| .setName(tp4.topic) |
| .setPartitionIndexes(List(Int.box(tp4.partition)).asJava) |
| ).asJava) |
| |
| def buildExpectedActions(topic: String): util.List[Action] = { |
| val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) |
| val action = new Action(AclOperation.READ, pattern, 1, true, true) |
| Collections.singletonList(action) |
| } |
| |
| // Topic `foo` is authorized and present in the metadata |
| addTopicToMetadataCache(tp1.topic, 4) // We will only access the first topic |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(tp1.topic)))) |
| .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) |
| |
| // Topic `bar` is not authorized |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(tp2.topic)))) |
| .thenReturn(Seq(AuthorizationResult.DENIED).asJava) |
| |
| // Topic `baz` is authorized, but not present in the metadata |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(tp3.topic)))) |
| .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) |
| |
| when(replicaManager.activeProducerState(tp1)) |
| .thenReturn(new DescribeProducersResponseData.PartitionResponse() |
| .setErrorCode(Errors.NONE.code) |
| .setPartitionIndex(tp1.partition) |
| .setActiveProducers(List( |
| new DescribeProducersResponseData.ProducerState() |
| .setProducerId(12345L) |
| .setProducerEpoch(15) |
| .setLastSequence(100) |
| .setLastTimestamp(time.milliseconds()) |
| .setCurrentTxnStartOffset(-1) |
| .setCoordinatorEpoch(200) |
| ).asJava)) |
| |
| val describeProducersRequest = new DescribeProducersRequest.Builder(data).build() |
| val request = buildRequest(describeProducersRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleDescribeProducersRequest(request) |
| |
| val response = verifyNoThrottling[DescribeProducersResponse](request) |
| assertEquals(Set("foo", "bar", "baz", "invalid;topic"), response.data.topics.asScala.map(_.name).toSet) |
| |
| def assertPartitionError( |
| topicPartition: TopicPartition, |
| error: Errors |
| ): DescribeProducersResponseData.PartitionResponse = { |
| val topicData = response.data.topics.asScala.find(_.name == topicPartition.topic).get |
| val partitionData = topicData.partitions.asScala.find(_.partitionIndex == topicPartition.partition).get |
| assertEquals(error, Errors.forCode(partitionData.errorCode)) |
| partitionData |
| } |
| |
| val fooPartition = assertPartitionError(tp1, Errors.NONE) |
| assertEquals(Errors.NONE, Errors.forCode(fooPartition.errorCode)) |
| assertEquals(1, fooPartition.activeProducers.size) |
| val fooProducer = fooPartition.activeProducers.get(0) |
| assertEquals(12345L, fooProducer.producerId) |
| assertEquals(15, fooProducer.producerEpoch) |
| assertEquals(100, fooProducer.lastSequence) |
| assertEquals(time.milliseconds(), fooProducer.lastTimestamp) |
| assertEquals(-1, fooProducer.currentTxnStartOffset) |
| assertEquals(200, fooProducer.coordinatorEpoch) |
| |
| assertPartitionError(tp2, Errors.TOPIC_AUTHORIZATION_FAILED) |
| assertPartitionError(tp3, Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| assertPartitionError(tp4, Errors.INVALID_TOPIC_EXCEPTION) |
| } |
| |
| @Test |
| def testDescribeTransactions(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| val data = new DescribeTransactionsRequestData() |
| .setTransactionalIds(List("foo", "bar").asJava) |
| val describeTransactionsRequest = new DescribeTransactionsRequest.Builder(data).build() |
| val request = buildRequest(describeTransactionsRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| def buildExpectedActions(transactionalId: String): util.List[Action] = { |
| val pattern = new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL) |
| val action = new Action(AclOperation.DESCRIBE, pattern, 1, true, true) |
| Collections.singletonList(action) |
| } |
| |
| when(txnCoordinator.handleDescribeTransactions("foo")) |
| .thenReturn(new DescribeTransactionsResponseData.TransactionState() |
| .setErrorCode(Errors.NONE.code) |
| .setTransactionalId("foo") |
| .setProducerId(12345L) |
| .setProducerEpoch(15) |
| .setTransactionStartTimeMs(time.milliseconds()) |
| .setTransactionState("CompleteCommit") |
| .setTransactionTimeoutMs(10000)) |
| |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("foo")))) |
| .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) |
| |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("bar")))) |
| .thenReturn(Seq(AuthorizationResult.DENIED).asJava) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleDescribeTransactionsRequest(request) |
| |
| val response = verifyNoThrottling[DescribeTransactionsResponse](request) |
| assertEquals(2, response.data.transactionStates.size) |
| |
| val fooState = response.data.transactionStates.asScala.find(_.transactionalId == "foo").get |
| assertEquals(Errors.NONE.code, fooState.errorCode) |
| assertEquals(12345L, fooState.producerId) |
| assertEquals(15, fooState.producerEpoch) |
| assertEquals(time.milliseconds(), fooState.transactionStartTimeMs) |
| assertEquals("CompleteCommit", fooState.transactionState) |
| assertEquals(10000, fooState.transactionTimeoutMs) |
| assertEquals(List.empty, fooState.topics.asScala.toList) |
| |
| val barState = response.data.transactionStates.asScala.find(_.transactionalId == "bar").get |
| assertEquals(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code, barState.errorCode) |
| } |
| |
| @Test |
| def testDescribeTransactionsFiltersUnauthorizedTopics(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| val transactionalId = "foo" |
| val data = new DescribeTransactionsRequestData() |
| .setTransactionalIds(List(transactionalId).asJava) |
| val describeTransactionsRequest = new DescribeTransactionsRequest.Builder(data).build() |
| val request = buildRequest(describeTransactionsRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| def expectDescribe( |
| resourceType: ResourceType, |
| transactionalId: String, |
| result: AuthorizationResult |
| ): Unit = { |
| val pattern = new ResourcePattern(resourceType, transactionalId, PatternType.LITERAL) |
| val action = new Action(AclOperation.DESCRIBE, pattern, 1, true, true) |
| val actions = Collections.singletonList(action) |
| |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(actions))) |
| .thenReturn(Seq(result).asJava) |
| } |
| |
| // Principal is authorized to one of the two topics. The second topic should be |
| // filtered from the result. |
| expectDescribe(ResourceType.TRANSACTIONAL_ID, transactionalId, AuthorizationResult.ALLOWED) |
| expectDescribe(ResourceType.TOPIC, "foo", AuthorizationResult.ALLOWED) |
| expectDescribe(ResourceType.TOPIC, "bar", AuthorizationResult.DENIED) |
| |
| def mkTopicData( |
| topic: String, |
| partitions: Seq[Int] |
| ): DescribeTransactionsResponseData.TopicData = { |
| new DescribeTransactionsResponseData.TopicData() |
| .setTopic(topic) |
| .setPartitions(partitions.map(Int.box).asJava) |
| } |
| |
| val describeTransactionsResponse = new DescribeTransactionsResponseData.TransactionState() |
| .setErrorCode(Errors.NONE.code) |
| .setTransactionalId(transactionalId) |
| .setProducerId(12345L) |
| .setProducerEpoch(15) |
| .setTransactionStartTimeMs(time.milliseconds()) |
| .setTransactionState("Ongoing") |
| .setTransactionTimeoutMs(10000) |
| |
| describeTransactionsResponse.topics.add(mkTopicData(topic = "foo", Seq(1, 2))) |
| describeTransactionsResponse.topics.add(mkTopicData(topic = "bar", Seq(3, 4))) |
| |
| when(txnCoordinator.handleDescribeTransactions("foo")) |
| .thenReturn(describeTransactionsResponse) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleDescribeTransactionsRequest(request) |
| |
| val response = verifyNoThrottling[DescribeTransactionsResponse](request) |
| assertEquals(1, response.data.transactionStates.size) |
| |
| val fooState = response.data.transactionStates.asScala.find(_.transactionalId == "foo").get |
| assertEquals(Errors.NONE.code, fooState.errorCode) |
| assertEquals(12345L, fooState.producerId) |
| assertEquals(15, fooState.producerEpoch) |
| assertEquals(time.milliseconds(), fooState.transactionStartTimeMs) |
| assertEquals("Ongoing", fooState.transactionState) |
| assertEquals(10000, fooState.transactionTimeoutMs) |
| assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList) |
| } |
| |
| @Test |
| def testListTransactionsErrorResponse(): Unit = { |
| val data = new ListTransactionsRequestData() |
| val listTransactionsRequest = new ListTransactionsRequest.Builder(data).build() |
| val request = buildRequest(listTransactionsRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| when(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String])) |
| .thenReturn(new ListTransactionsResponseData() |
| .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code)) |
| |
| createKafkaApis().handleListTransactionsRequest(request) |
| |
| val response = verifyNoThrottling[ListTransactionsResponse](request) |
| assertEquals(0, response.data.transactionStates.size) |
| assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, Errors.forCode(response.data.errorCode)) |
| } |
| |
| @Test |
| def testListTransactionsAuthorization(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| val data = new ListTransactionsRequestData() |
| val listTransactionsRequest = new ListTransactionsRequest.Builder(data).build() |
| val request = buildRequest(listTransactionsRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| val transactionStates = new util.ArrayList[ListTransactionsResponseData.TransactionState]() |
| transactionStates.add(new ListTransactionsResponseData.TransactionState() |
| .setTransactionalId("foo") |
| .setProducerId(12345L) |
| .setTransactionState("Ongoing")) |
| transactionStates.add(new ListTransactionsResponseData.TransactionState() |
| .setTransactionalId("bar") |
| .setProducerId(98765) |
| .setTransactionState("PrepareAbort")) |
| |
| when(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String])) |
| .thenReturn(new ListTransactionsResponseData() |
| .setErrorCode(Errors.NONE.code) |
| .setTransactionStates(transactionStates)) |
| |
| def buildExpectedActions(transactionalId: String): util.List[Action] = { |
| val pattern = new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL) |
| val action = new Action(AclOperation.DESCRIBE, pattern, 1, true, true) |
| Collections.singletonList(action) |
| } |
| |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("foo")))) |
| .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) |
| |
| when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("bar")))) |
| .thenReturn(Seq(AuthorizationResult.DENIED).asJava) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleListTransactionsRequest(request) |
| |
| val response = verifyNoThrottling[ListTransactionsResponse](request) |
| assertEquals(1, response.data.transactionStates.size()) |
| val transactionState = response.data.transactionStates.get(0) |
| assertEquals("foo", transactionState.transactionalId) |
| assertEquals(12345L, transactionState.producerId) |
| assertEquals("Ongoing", transactionState.transactionState) |
| } |
| |
| @Test |
| def testDeleteTopicsByIdAuthorization(): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| val controllerContext: ControllerContext = mock(classOf[ControllerContext]) |
| |
| when(clientControllerQuotaManager.newQuotaFor( |
| any[RequestChannel.Request], |
| anyShort |
| )).thenReturn(UnboundedControllerMutationQuota) |
| when(controller.isActive).thenReturn(true) |
| when(controller.controllerContext).thenReturn(controllerContext) |
| |
| val topicResults = Map( |
| AclOperation.DESCRIBE -> Map( |
| "foo" -> AuthorizationResult.DENIED, |
| "bar" -> AuthorizationResult.ALLOWED |
| ), |
| AclOperation.DELETE -> Map( |
| "foo" -> AuthorizationResult.DENIED, |
| "bar" -> AuthorizationResult.DENIED |
| ) |
| ) |
| when(authorizer.authorize(any[RequestContext], isNotNull[util.List[Action]])).thenAnswer(invocation => { |
| val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]] |
| actions.asScala.map { action => |
| val topic = action.resourcePattern.name |
| val ops = action.operation() |
| topicResults(ops)(topic) |
| }.asJava |
| }) |
| |
| // Try to delete three topics: |
| // 1. One without describe permission |
| // 2. One without delete permission |
| // 3. One which is authorized, but doesn't exist |
| val topicIdsMap = Map( |
| Uuid.randomUuid() -> Some("foo"), |
| Uuid.randomUuid() -> Some("bar"), |
| Uuid.randomUuid() -> None |
| ) |
| |
| topicIdsMap.foreach { case (topicId, topicNameOpt) => |
| when(controllerContext.topicName(topicId)).thenReturn(topicNameOpt) |
| } |
| |
| val topicDatas = topicIdsMap.keys.map { topicId => |
| new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId) |
| }.toList |
| val deleteRequest = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() |
| .setTopics(topicDatas.asJava)) |
| .build(ApiKeys.DELETE_TOPICS.latestVersion) |
| |
| val request = buildRequest(deleteRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request) |
| verify(authorizer, times(2)).authorize(any(), any()) |
| |
| val deleteResponse = verifyNoThrottling[DeleteTopicsResponse](request) |
| |
| topicIdsMap.foreach { case (topicId, nameOpt) => |
| val response = deleteResponse.data.responses.asScala.find(_.topicId == topicId).get |
| nameOpt match { |
| case Some("foo") => |
| assertNull(response.name) |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode)) |
| case Some("bar") => |
| assertEquals("bar", response.name) |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode)) |
| case None => |
| assertNull(response.name) |
| assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode)) |
| case _ => |
| fail("Unexpected topic id/name mapping") |
| } |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = { |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| |
| when(clientControllerQuotaManager.newQuotaFor( |
| any[RequestChannel.Request], |
| anyShort |
| )).thenReturn(UnboundedControllerMutationQuota) |
| when(controller.isActive).thenReturn(true) |
| |
| // Try to delete three topics: |
| // 1. One without describe permission |
| // 2. One without delete permission |
| // 3. One which is authorized, but doesn't exist |
| |
| val topicResults = Map( |
| AclOperation.DESCRIBE -> Map( |
| "foo" -> AuthorizationResult.DENIED, |
| "bar" -> AuthorizationResult.ALLOWED, |
| "baz" -> AuthorizationResult.ALLOWED |
| ), |
| AclOperation.DELETE -> Map( |
| "foo" -> AuthorizationResult.DENIED, |
| "bar" -> AuthorizationResult.DENIED, |
| "baz" -> AuthorizationResult.ALLOWED |
| ) |
| ) |
| when(authorizer.authorize(any[RequestContext], isNotNull[util.List[Action]])).thenAnswer(invocation => { |
| val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]] |
| actions.asScala.map { action => |
| val topic = action.resourcePattern.name |
| val ops = action.operation() |
| topicResults(ops)(topic) |
| }.asJava |
| }) |
| |
| val deleteRequest = if (usePrimitiveTopicNameArray) { |
| new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() |
| .setTopicNames(List("foo", "bar", "baz").asJava)) |
| .build(5.toShort) |
| } else { |
| val topicDatas = List( |
| new DeleteTopicsRequestData.DeleteTopicState().setName("foo"), |
| new DeleteTopicsRequestData.DeleteTopicState().setName("bar"), |
| new DeleteTopicsRequestData.DeleteTopicState().setName("baz") |
| ) |
| new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() |
| .setTopics(topicDatas.asJava)) |
| .build(ApiKeys.DELETE_TOPICS.latestVersion) |
| } |
| |
| val request = buildRequest(deleteRequest) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request) |
| verify(authorizer, times(2)).authorize(any(), any()) |
| |
| val deleteResponse = verifyNoThrottling[DeleteTopicsResponse](request) |
| |
| def lookupErrorCode(topic: String): Option[Errors] = { |
| Option(deleteResponse.data.responses().find(topic)) |
| .map(result => Errors.forCode(result.errorCode)) |
| } |
| |
| assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("foo")) |
| assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("bar")) |
| assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz")) |
| } |
| |
| private def createMockRequest(): RequestChannel.Request = { |
| val request: RequestChannel.Request = mock(classOf[RequestChannel.Request]) |
| val requestHeader: RequestHeader = mock(classOf[RequestHeader]) |
| when(request.header).thenReturn(requestHeader) |
| when(requestHeader.apiKey()).thenReturn(ApiKeys.values().head) |
| request |
| } |
| |
| private def verifyShouldNeverHandleErrorMessage(handler: RequestChannel.Request => Unit): Unit = { |
| val request = createMockRequest() |
| val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request)) |
| assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) |
| } |
| |
| private def verifyShouldAlwaysForwardErrorMessage(handler: RequestChannel.Request => Unit): Unit = { |
| val request = createMockRequest() |
| val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request)) |
| assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage) |
| } |
| |
| @Test |
| def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest) |
| } |
| |
| @Test |
| def testRaftShouldNeverHandleStopReplicaRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleStopReplicaRequest) |
| } |
| |
| @Test |
| def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching)) |
| } |
| |
| @Test |
| def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleControlledShutdownRequest) |
| } |
| |
| @Test |
| def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionRequest) |
| } |
| |
| @Test |
| def testRaftShouldNeverHandleEnvelope(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching)) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTopicsRequest) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardCreateAcls(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateAcls) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardDeleteAcls(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteAcls) |
| } |
| |
| @Test |
| def testEmptyLegacyAlterConfigsRequestWithKRaft(): Unit = { |
| val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData(), 1.toShort)) |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| |
| createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request) |
| val response = verifyNoThrottling[AlterConfigsResponse](request) |
| assertEquals(new AlterConfigsResponseData(), response.data()) |
| } |
| |
| @Test |
| def testInvalidLegacyAlterConfigsRequestWithKRaft(): Unit = { |
| val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData(). |
| setValidateOnly(true). |
| setResources(new LAlterConfigsResourceCollection(asList( |
| new LAlterConfigsResource(). |
| setResourceName(brokerId.toString). |
| setResourceType(BROKER.id()). |
| setConfigs(new LAlterableConfigCollection(asList(new LAlterableConfig(). |
| setName("foo"). |
| setValue(null)).iterator()))).iterator())), 1.toShort)) |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request) |
| val response = verifyNoThrottling[AlterConfigsResponse](request) |
| assertEquals(new AlterConfigsResponseData().setResponses(asList( |
| new LAlterConfigsResourceResponse(). |
| setErrorCode(Errors.INVALID_REQUEST.code()). |
| setErrorMessage("Null value not supported for : foo"). |
| setResourceName(brokerId.toString). |
| setResourceType(BROKER.id()))), |
| response.data()) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest) |
| } |
| |
| @Test |
| def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = { |
| val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort)) |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest(request) |
| val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request) |
| assertEquals(new IncrementalAlterConfigsResponseData(), response.data()) |
| } |
| |
| @Test |
| def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = { |
| val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(). |
| setValidateOnly(true). |
| setResources(new IAlterConfigsResourceCollection(asList(new IAlterConfigsResource(). |
| setResourceName(brokerId.toString). |
| setResourceType(BROKER_LOGGER.id()). |
| setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig(). |
| setName(Log4jController.ROOT_LOGGER). |
| setValue("TRACE")).iterator()))).iterator())), |
| 1.toShort)) |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), |
| any[Long])).thenReturn(0) |
| createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest(request) |
| val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request) |
| assertEquals(new IncrementalAlterConfigsResponseData().setResponses(asList( |
| new IAlterConfigsResourceResponse(). |
| setErrorCode(0.toShort). |
| setErrorMessage(null). |
| setResourceName(brokerId.toString). |
| setResourceType(BROKER_LOGGER.id()))), |
| response.data()) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTokenRequest) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleRenewTokenRequest) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleExpireTokenRequest) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleUpdateFeatures) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardElectLeaders(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleElectLeaders) |
| } |
| |
| @Test |
| def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = { |
| metadataCache = MetadataCache.kRaftMetadataCache(brokerId) |
| verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest) |
| } |
| |
| @Test |
| def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = { |
| val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") |
| |
| val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) |
| |
| createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching) |
| |
| val expectedHeartbeatResponse = new ConsumerGroupHeartbeatResponseData() |
| .setErrorCode(Errors.UNSUPPORTED_VERSION.code) |
| val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) |
| assertEquals(expectedHeartbeatResponse, response.data) |
| } |
| |
| @Test |
| def testConsumerGroupHeartbeatRequest(): Unit = { |
| val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") |
| |
| val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) |
| |
| val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]() |
| when(groupCoordinator.consumerGroupHeartbeat( |
| requestChannelRequest.context, |
| consumerGroupHeartbeatRequest |
| )).thenReturn(future) |
| |
| createKafkaApis(overrideProperties = Map( |
| KafkaConfig.NewGroupCoordinatorEnableProp -> "true" |
| )).handle(requestChannelRequest, RequestLocal.NoCaching) |
| |
| val consumerGroupHeartbeatResponse = new ConsumerGroupHeartbeatResponseData() |
| .setMemberId("member") |
| |
| future.complete(consumerGroupHeartbeatResponse) |
| val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) |
| assertEquals(consumerGroupHeartbeatResponse, response.data) |
| } |
| |
| @Test |
| def testConsumerGroupHeartbeatRequestFutureFailed(): Unit = { |
| val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") |
| |
| val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) |
| |
| val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]() |
| when(groupCoordinator.consumerGroupHeartbeat( |
| requestChannelRequest.context, |
| consumerGroupHeartbeatRequest |
| )).thenReturn(future) |
| |
| createKafkaApis(overrideProperties = Map( |
| KafkaConfig.NewGroupCoordinatorEnableProp -> "true" |
| )).handle(requestChannelRequest, RequestLocal.NoCaching) |
| |
| future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) |
| val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) |
| assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode) |
| } |
| |
| @Test |
| def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = { |
| val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") |
| |
| val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) |
| |
| val authorizer: Authorizer = mock(classOf[Authorizer]) |
| when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) |
| .thenReturn(Seq(AuthorizationResult.DENIED).asJava) |
| |
| createKafkaApis( |
| authorizer = Some(authorizer), |
| overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> "true") |
| ).handle(requestChannelRequest, RequestLocal.NoCaching) |
| |
| val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) |
| assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) |
| } |
| } |