blob: 05a42c10f58e97a5e876af1947335d37cee460ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.server.config.ServerLogConfigs;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(serverProperties = {
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "2000")
})
public class AdminFenceProducersTest {
private static final String TOPIC_NAME = "mytopic";
private static final String TXN_ID = "mytxnid";
private static final String INCORRECT_BROKER_PORT = "225";
private static final ProducerRecord<byte[], byte[]> RECORD = new ProducerRecord<>(TOPIC_NAME, null, new byte[1]);
private final ClusterInstance clusterInstance;
AdminFenceProducersTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance;
}
private Producer<byte[], byte[]> createProducer() {
return clusterInstance.producer(Map.of(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TXN_ID,
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "2000"));
}
@ClusterTest
void testFenceAfterProducerCommit() throws Exception {
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
try (Producer<byte[], byte[]> producer = createProducer();
Admin adminClient = clusterInstance.admin()) {
producer.initTransactions();
producer.beginTransaction();
producer.send(RECORD).get();
producer.commitTransaction();
adminClient.fenceProducers(Collections.singletonList(TXN_ID)).all().get();
producer.beginTransaction();
ExecutionException exceptionDuringSend = assertThrows(
ExecutionException.class,
() -> producer.send(RECORD).get(), "expected InvalidProducerEpochException"
);
// In Transaction V2, the ProducerFencedException will be converted to InvalidProducerEpochException when
// coordinator handles AddPartitionRequest.
assertInstanceOf(InvalidProducerEpochException.class, exceptionDuringSend.getCause());
// InvalidProducerEpochException is treated as fatal error. The commitTransaction will return this last
// fatal error.
assertThrows(InvalidProducerEpochException.class, producer::commitTransaction);
}
}
@ClusterTest
void testFenceProducerTimeoutMs() {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + INCORRECT_BROKER_PORT);
try (Admin adminClient = clusterInstance.admin(config)) {
ExecutionException exception = assertThrows(
ExecutionException.class, () ->
adminClient.fenceProducers(Collections.singletonList(TXN_ID), new FenceProducersOptions().timeoutMs(0)).all().get());
assertInstanceOf(TimeoutException.class, exception.getCause());
}
}
@ClusterTest
void testFenceBeforeProducerCommit() throws Exception {
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
try (Producer<byte[], byte[]> producer = createProducer();
Admin adminClient = clusterInstance.admin()) {
producer.initTransactions();
producer.beginTransaction();
producer.send(RECORD).get();
adminClient.fenceProducers(Collections.singletonList(TXN_ID)).all().get();
ExecutionException exceptionDuringSend = assertThrows(
ExecutionException.class, () ->
producer.send(RECORD).get(), "expected ProducerFencedException"
);
assertTrue(exceptionDuringSend.getCause() instanceof ProducerFencedException ||
exceptionDuringSend.getCause() instanceof InvalidProducerEpochException);
ApiException exceptionDuringCommit = assertThrows(
ApiException.class,
producer::commitTransaction, "Expected Exception"
);
assertTrue(exceptionDuringCommit instanceof ProducerFencedException ||
exceptionDuringCommit instanceof InvalidProducerEpochException);
}
}
}