blob: 03b13d3fdf05b22a331aa9c2caa507feff80a283 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TransactionManagerTest {
private static final int MAX_REQUEST_SIZE = 1024 * 1024;
private static final short ACKS_ALL = -1;
private static final int MAX_RETRIES = Integer.MAX_VALUE;
private static final String CLIENT_ID = "clientId";
private static final int MAX_BLOCK_TIMEOUT = 1000;
private static final int REQUEST_TIMEOUT = 1000;
private static final long DEFAULT_RETRY_BACKOFF_MS = 100L;
private final String transactionalId = "foobar";
private final int transactionTimeoutMs = 1121;
private final String topic = "test";
private TopicPartition tp0 = new TopicPartition(topic, 0);
private TopicPartition tp1 = new TopicPartition(topic, 1);
private MockTime time = new MockTime();
private ProducerMetadata metadata = new ProducerMetadata(0, Long.MAX_VALUE, new LogContext(),
new ClusterResourceListeners(), time);
private MockClient client = new MockClient(time, metadata);
private ApiVersions apiVersions = new ApiVersions();
private RecordAccumulator accumulator = null;
private Sender sender = null;
private TransactionManager transactionManager = null;
private Node brokerNode = null;
private final LogContext logContext = new LogContext();
@Before
public void setup() {
Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("client-id", CLIENT_ID);
int batchSize = 16 * 1024;
int deliveryTimeoutMs = 3000;
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
this.brokerNode = new Node(0, "localhost", 2211);
this.transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs,
DEFAULT_RETRY_BACKOFF_MS);
Metrics metrics = new Metrics(metricConfig, time);
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
this.metadata.add("test");
this.client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap("test", 2)));
}
@Test
public void testSenderShutdownWithPendingTransactions() throws Exception {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(tp0, Errors.NONE);
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.initiateClose();
sender.runOnce();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.runOnce();
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.runOnce();
assertTrue(result.isCompleted());
sender.run();
assertTrue(sendFuture.isDone());
}
@Test
public void testEndTxnNotSentIfIncompleteBatches() {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
transactionManager.beginCommit();
assertNull(transactionManager.nextRequestHandler(true));
assertTrue(transactionManager.nextRequestHandler(false).isEndTxn());
}
@Test(expected = IllegalStateException.class)
public void testFailIfNotReadyForSendNoProducerId() {
transactionManager.failIfNotReadyForSend();
}
@Test
public void testFailIfNotReadyForSendIdempotentProducer() {
TransactionManager idempotentTransactionManager = new TransactionManager();
idempotentTransactionManager.failIfNotReadyForSend();
}
@Test(expected = KafkaException.class)
public void testFailIfNotReadyForSendIdempotentProducerFatalError() {
TransactionManager idempotentTransactionManager = new TransactionManager();
idempotentTransactionManager.transitionToFatalError(new KafkaException());
idempotentTransactionManager.failIfNotReadyForSend();
}
@Test(expected = IllegalStateException.class)
public void testFailIfNotReadyForSendNoOngoingTransaction() {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.failIfNotReadyForSend();
}
@Test(expected = KafkaException.class)
public void testFailIfNotReadyForSendAfterAbortableError() {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.transitionToAbortableError(new KafkaException());
transactionManager.failIfNotReadyForSend();
}
@Test(expected = KafkaException.class)
public void testFailIfNotReadyForSendAfterFatalError() {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.transitionToFatalError(new KafkaException());
transactionManager.failIfNotReadyForSend();
}
@Test
public void testHasOngoingTransactionSuccessfulAbort() {
long pid = 13131L;
short epoch = 1;
TopicPartition partition = new TopicPartition("foo", 0);
assertFalse(transactionManager.hasOngoingTransaction());
doInitTransactions(pid, epoch);
assertFalse(transactionManager.hasOngoingTransaction());
transactionManager.beginTransaction();
assertTrue(transactionManager.hasOngoingTransaction());
transactionManager.maybeAddPartitionToTransaction(partition);
assertTrue(transactionManager.hasOngoingTransaction());
prepareAddPartitionsToTxn(partition, Errors.NONE);
sender.runOnce();
transactionManager.beginAbort();
assertTrue(transactionManager.hasOngoingTransaction());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.runOnce();
assertFalse(transactionManager.hasOngoingTransaction());
}
@Test
public void testHasOngoingTransactionSuccessfulCommit() {
long pid = 13131L;
short epoch = 1;
TopicPartition partition = new TopicPartition("foo", 0);
assertFalse(transactionManager.hasOngoingTransaction());
doInitTransactions(pid, epoch);
assertFalse(transactionManager.hasOngoingTransaction());
transactionManager.beginTransaction();
assertTrue(transactionManager.hasOngoingTransaction());
transactionManager.maybeAddPartitionToTransaction(partition);
assertTrue(transactionManager.hasOngoingTransaction());
prepareAddPartitionsToTxn(partition, Errors.NONE);
sender.runOnce();
transactionManager.beginCommit();
assertTrue(transactionManager.hasOngoingTransaction());
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.runOnce();
assertFalse(transactionManager.hasOngoingTransaction());
}
@Test
public void testHasOngoingTransactionAbortableError() {
long pid = 13131L;
short epoch = 1;
TopicPartition partition = new TopicPartition("foo", 0);
assertFalse(transactionManager.hasOngoingTransaction());
doInitTransactions(pid, epoch);
assertFalse(transactionManager.hasOngoingTransaction());
transactionManager.beginTransaction();
assertTrue(transactionManager.hasOngoingTransaction());
transactionManager.maybeAddPartitionToTransaction(partition);
assertTrue(transactionManager.hasOngoingTransaction());
prepareAddPartitionsToTxn(partition, Errors.NONE);
sender.runOnce();
transactionManager.transitionToAbortableError(new KafkaException());
assertTrue(transactionManager.hasOngoingTransaction());
transactionManager.beginAbort();
assertTrue(transactionManager.hasOngoingTransaction());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.runOnce();
assertFalse(transactionManager.hasOngoingTransaction());
}
@Test
public void testHasOngoingTransactionFatalError() {
long pid = 13131L;
short epoch = 1;
TopicPartition partition = new TopicPartition("foo", 0);
assertFalse(transactionManager.hasOngoingTransaction());
doInitTransactions(pid, epoch);
assertFalse(transactionManager.hasOngoingTransaction());
transactionManager.beginTransaction();
assertTrue(transactionManager.hasOngoingTransaction());
transactionManager.maybeAddPartitionToTransaction(partition);
assertTrue(transactionManager.hasOngoingTransaction());
prepareAddPartitionsToTxn(partition, Errors.NONE);
sender.runOnce();
transactionManager.transitionToFatalError(new KafkaException());
assertFalse(transactionManager.hasOngoingTransaction());
}
@Test
public void testMaybeAddPartitionToTransaction() {
long pid = 13131L;
short epoch = 1;
TopicPartition partition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(partition);
assertTrue(transactionManager.hasPartitionsToAdd());
assertFalse(transactionManager.isPartitionAdded(partition));
assertTrue(transactionManager.isPartitionPendingAdd(partition));
prepareAddPartitionsToTxn(partition, Errors.NONE);
sender.runOnce();
assertFalse(transactionManager.hasPartitionsToAdd());
assertTrue(transactionManager.isPartitionAdded(partition));
assertFalse(transactionManager.isPartitionPendingAdd(partition));
// adding the partition again should not have any effect
transactionManager.maybeAddPartitionToTransaction(partition);
assertFalse(transactionManager.hasPartitionsToAdd());
assertTrue(transactionManager.isPartitionAdded(partition));
assertFalse(transactionManager.isPartitionPendingAdd(partition));
}
@Test
public void testAddPartitionToTransactionOverridesRetryBackoffForConcurrentTransactions() {
long pid = 13131L;
short epoch = 1;
TopicPartition partition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(partition);
assertTrue(transactionManager.hasPartitionsToAdd());
assertFalse(transactionManager.isPartitionAdded(partition));
assertTrue(transactionManager.isPartitionPendingAdd(partition));
prepareAddPartitionsToTxn(partition, Errors.CONCURRENT_TRANSACTIONS);
sender.runOnce();
TransactionManager.TxnRequestHandler handler = transactionManager.nextRequestHandler(false);
assertNotNull(handler);
assertEquals(20, handler.retryBackoffMs());
}
@Test
public void testAddPartitionToTransactionRetainsRetryBackoffForRegularRetriableError() {
long pid = 13131L;
short epoch = 1;
TopicPartition partition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(partition);
assertTrue(transactionManager.hasPartitionsToAdd());
assertFalse(transactionManager.isPartitionAdded(partition));
assertTrue(transactionManager.isPartitionPendingAdd(partition));
prepareAddPartitionsToTxn(partition, Errors.COORDINATOR_NOT_AVAILABLE);
sender.runOnce();
TransactionManager.TxnRequestHandler handler = transactionManager.nextRequestHandler(false);
assertNotNull(handler);
assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs());
}
@Test
public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlreadyAdded() {
long pid = 13131L;
short epoch = 1;
TopicPartition partition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(partition);
assertTrue(transactionManager.hasPartitionsToAdd());
assertFalse(transactionManager.isPartitionAdded(partition));
assertTrue(transactionManager.isPartitionPendingAdd(partition));
prepareAddPartitionsToTxn(partition, Errors.NONE);
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(partition));
TopicPartition otherPartition = new TopicPartition("foo", 1);
transactionManager.maybeAddPartitionToTransaction(otherPartition);
prepareAddPartitionsToTxn(otherPartition, Errors.CONCURRENT_TRANSACTIONS);
TransactionManager.TxnRequestHandler handler = transactionManager.nextRequestHandler(false);
assertNotNull(handler);
assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs());
}
@Test(expected = IllegalStateException.class)
public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
}
@Test(expected = IllegalStateException.class)
public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
}
@Test(expected = KafkaException.class)
public void testMaybeAddPartitionToTransactionAfterAbortableError() {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.transitionToAbortableError(new KafkaException());
transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
}
@Test(expected = KafkaException.class)
public void testMaybeAddPartitionToTransactionAfterFatalError() {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.transitionToFatalError(new KafkaException());
transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
}
@Test
public void testIsSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
transactionManager.transitionToAbortableError(new KafkaException());
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
// Send the AddPartitionsToTxn request and leave it in-flight
sender.runOnce();
transactionManager.transitionToAbortableError(new KafkaException());
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testIsSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
transactionManager.transitionToFatalError(new KafkaException());
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
assertTrue(transactionManager.hasFatalError());
}
@Test
public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
// Send the AddPartitionsToTxn request and leave it in-flight
sender.runOnce();
transactionManager.transitionToFatalError(new KafkaException());
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
assertTrue(transactionManager.hasFatalError());
}
@Test
public void testIsSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
sender.runOnce();
assertFalse(transactionManager.hasPartitionsToAdd());
transactionManager.transitionToAbortableError(new KafkaException());
assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testIsSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
sender.runOnce();
assertFalse(transactionManager.hasPartitionsToAdd());
transactionManager.transitionToFatalError(new KafkaException());
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
assertTrue(transactionManager.hasFatalError());
}
@Test
public void testIsSendToPartitionAllowedWithPartitionNotAdded() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
}
@Test
public void testDefaultSequenceNumber() {
TransactionManager transactionManager = new TransactionManager();
assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
transactionManager.incrementSequenceNumber(tp0, 3);
assertEquals((int) transactionManager.sequenceNumber(tp0), 3);
}
@Test
public void testResetSequenceNumbersAfterUnknownProducerId() {
final long producerId = 13131L;
final short epoch = 1;
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
ProducerBatch b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");
ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2");
ProducerBatch b3 = writeIdempotentBatchWithValue(transactionManager, tp0, "3");
ProducerBatch b4 = writeIdempotentBatchWithValue(transactionManager, tp0, "4");
ProducerBatch b5 = writeIdempotentBatchWithValue(transactionManager, tp0, "5");
assertEquals(5, transactionManager.sequenceNumber(tp0).intValue());
// First batch succeeds
long b1AppendTime = time.milliseconds();
ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(
Errors.NONE, 500L, b1AppendTime, 0L);
b1.done(500L, b1AppendTime, null);
transactionManager.handleCompletedBatch(b1, b1Response);
// Retention caused log start offset to jump forward. We set sequence numbers back to 0
ProduceResponse.PartitionResponse b2Response = new ProduceResponse.PartitionResponse(
Errors.UNKNOWN_PRODUCER_ID, -1, -1, 600L);
assertTrue(transactionManager.canRetry(b2Response, b2));
assertEquals(4, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(0, b2.baseSequence());
assertEquals(1, b3.baseSequence());
assertEquals(2, b4.baseSequence());
assertEquals(3, b5.baseSequence());
}
@Test
public void testAdjustSequenceNumbersAfterFatalError() {
final long producerId = 13131L;
final short epoch = 1;
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
ProducerBatch b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");
ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2");
ProducerBatch b3 = writeIdempotentBatchWithValue(transactionManager, tp0, "3");
ProducerBatch b4 = writeIdempotentBatchWithValue(transactionManager, tp0, "4");
ProducerBatch b5 = writeIdempotentBatchWithValue(transactionManager, tp0, "5");
assertEquals(5, transactionManager.sequenceNumber(tp0).intValue());
// First batch succeeds
long b1AppendTime = time.milliseconds();
ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(
Errors.NONE, 500L, b1AppendTime, 0L);
b1.done(500L, b1AppendTime, null);
transactionManager.handleCompletedBatch(b1, b1Response);
// Second batch fails with a fatal error. Sequence numbers are adjusted by one for remaining
// inflight batches.
ProduceResponse.PartitionResponse b2Response = new ProduceResponse.PartitionResponse(
Errors.MESSAGE_TOO_LARGE, -1, -1, 0L);
assertFalse(transactionManager.canRetry(b2Response, b2));
b2.done(-1L, -1L, Errors.MESSAGE_TOO_LARGE.exception());
transactionManager.handleFailedBatch(b2, Errors.MESSAGE_TOO_LARGE.exception(), true);
assertEquals(4, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(1, b3.baseSequence());
assertEquals(2, b4.baseSequence());
assertEquals(3, b5.baseSequence());
// The remaining batches are doomed to fail, but they can be retried. Expected
// sequence numbers should remain the same.
ProduceResponse.PartitionResponse b3Response = new ProduceResponse.PartitionResponse(
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1, -1, 0L);
assertTrue(transactionManager.canRetry(b3Response, b3));
assertEquals(4, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(1, b3.baseSequence());
assertEquals(2, b4.baseSequence());
assertEquals(3, b5.baseSequence());
}
@Test
public void testBatchFailureAfterProducerReset() {
// This tests a scenario where the producerId is reset while pending requests are still inflight.
// The returned responses should not update internal state.
final long producerId = 13131L;
final short epoch = 1;
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
ProducerBatch b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");
ProducerIdAndEpoch updatedProducerIdAndEpoch = new ProducerIdAndEpoch(producerId + 1, epoch);
transactionManager.resetProducerId();
transactionManager.setProducerIdAndEpoch(updatedProducerIdAndEpoch);
ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2");
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(
Errors.UNKNOWN_PRODUCER_ID, -1, -1, 400L);
assertFalse(transactionManager.canRetry(b1Response, b1));
transactionManager.handleFailedBatch(b1, Errors.UNKNOWN_PRODUCER_ID.exception(), true);
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(b2, transactionManager.nextBatchBySequence(tp0));
}
@Test
public void testBatchCompletedAfterProducerReset() {
final long producerId = 13131L;
final short epoch = 1;
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
ProducerBatch b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");
// The producerId might be reset due to a failure on another partition
ProducerIdAndEpoch updatedProducerIdAndEpoch = new ProducerIdAndEpoch(producerId + 1, epoch);
transactionManager.resetProducerId();
transactionManager.setProducerIdAndEpoch(updatedProducerIdAndEpoch);
ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2");
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
// If the request returns successfully, we should ignore the response and not update any state
ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(
Errors.NONE, 500L, time.milliseconds(), 0L);
transactionManager.handleCompletedBatch(b1, b1Response);
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(b2, transactionManager.nextBatchBySequence(tp0));
}
private ProducerBatch writeIdempotentBatchWithValue(TransactionManager manager,
TopicPartition tp,
String value) {
int seq = manager.sequenceNumber(tp);
manager.incrementSequenceNumber(tp, 1);
ProducerBatch batch = batchWithValue(tp, value);
batch.setProducerState(manager.producerIdAndEpoch(), seq, false);
manager.addInFlightBatch(batch);
batch.close();
return batch;
}
private ProducerBatch batchWithValue(TopicPartition tp, String value) {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(64),
CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
long currentTimeMs = time.milliseconds();
ProducerBatch batch = new ProducerBatch(tp, builder, currentTimeMs);
batch.tryAppend(currentTimeMs, new byte[0], value.getBytes(), new Header[0], null, currentTimeMs);
return batch;
}
@Test
public void testSequenceNumberOverflow() {
TransactionManager transactionManager = new TransactionManager();
assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
assertEquals((int) transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE);
transactionManager.incrementSequenceNumber(tp0, 100);
assertEquals((int) transactionManager.sequenceNumber(tp0), 99);
transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
assertEquals((int) transactionManager.sequenceNumber(tp0), 98);
}
@Test
public void testProducerIdReset() {
TransactionManager transactionManager = new TransactionManager();
assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
transactionManager.incrementSequenceNumber(tp0, 3);
assertEquals((int) transactionManager.sequenceNumber(tp0), 3);
transactionManager.resetProducerId();
assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
}
@Test
public void testBasicTransaction() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.NONE, pid, epoch);
assertFalse(transactionManager.transactionContainsPartition(tp0));
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
sender.runOnce(); // send addPartitions.
// Check that only addPartitions was sent.
assertTrue(transactionManager.transactionContainsPartition(tp0));
assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
assertFalse(responseFuture.isDone());
sender.runOnce(); // send produce request.
assertTrue(responseFuture.isDone());
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp1, new OffsetAndMetadata(1));
final String consumerGroupId = "myconsumergroup";
TransactionalRequestResult addOffsetsResult = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
assertFalse(transactionManager.hasPendingOffsetCommits());
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
sender.runOnce(); // Send AddOffsetsRequest
assertTrue(transactionManager.hasPendingOffsetCommits()); // We should now have created and queued the offset commit request.
assertFalse(addOffsetsResult.isCompleted()); // the result doesn't complete until TxnOffsetCommit returns
Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
txnOffsetCommitResponse.put(tp1, Errors.NONE);
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
sender.runOnce(); // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
sender.runOnce(); // send find coordinator for group request
assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
assertTrue(transactionManager.hasPendingOffsetCommits());
sender.runOnce(); // send TxnOffsetCommitRequest commit.
assertFalse(transactionManager.hasPendingOffsetCommits());
assertTrue(addOffsetsResult.isCompleted()); // We should only be done after both RPCs complete.
transactionManager.beginCommit();
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.runOnce(); // commit.
assertFalse(transactionManager.hasOngoingTransaction());
assertFalse(transactionManager.isCompleting());
assertFalse(transactionManager.transactionContainsPartition(tp0));
}
@Test
public void testDisconnectAndRetry() {
// This is called from the initTransactions method in the producer as the first order of business.
// It finds the coordinator and then gets a PID.
transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, true, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // find coordinator, connection lost.
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // find coordinator
sender.runOnce();
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
}
@Test
public void testUnsupportedFindCoordinator() {
transactionManager.initializeTransactions();
client.prepareUnsupportedVersionResponse(body -> {
FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), CoordinatorType.TRANSACTION);
assertEquals(findCoordinatorRequest.data().key(), transactionalId);
return true;
});
sender.runOnce(); // InitProducerRequest is queued
sender.runOnce(); // FindCoordinator is queued after peeking InitProducerRequest
assertTrue(transactionManager.hasFatalError());
assertTrue(transactionManager.lastError() instanceof UnsupportedVersionException);
}
@Test
public void testUnsupportedInitTransactions() {
transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // InitProducerRequest is queued
sender.runOnce(); // FindCoordinator is queued after peeking InitProducerRequest
assertFalse(transactionManager.hasError());
assertNotNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
client.prepareUnsupportedVersionResponse(body -> {
InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId);
assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs);
return true;
});
sender.runOnce(); // InitProducerRequest is dequeued
assertTrue(transactionManager.hasFatalError());
assertTrue(transactionManager.lastError() instanceof UnsupportedVersionException);
}
@Test
public void testUnsupportedForMessageFormatInTxnOffsetCommit() {
final String consumerGroupId = "consumer";
final long pid = 13131L;
final short epoch = 1;
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
sender.runOnce(); // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
sender.runOnce(); // FindCoordinator Enqueued
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
sender.runOnce(); // FindCoordinator Returned
prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT));
sender.runOnce(); // TxnOffsetCommit Handled
assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof UnsupportedForMessageFormatException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
assertTrue(sendOffsetsResult.error() instanceof UnsupportedForMessageFormatException);
assertFatalError(UnsupportedForMessageFormatException.class);
}
@Test
public void testLookupCoordinatorOnDisconnectAfterSend() {
// This is called from the initTransactions method in the producer as the first order of business.
// It finds the coordinator and then gets a PID.
final long pid = 13131L;
final short epoch = 1;
TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // find coordinator
sender.runOnce();
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
prepareInitPidResponse(Errors.NONE, true, pid, epoch);
// send pid to coordinator, should get disconnected before receiving the response, and resend the
// FindCoordinator and InitPid requests.
sender.runOnce();
assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
assertFalse(initPidResult.isCompleted());
assertFalse(transactionManager.hasProducerId());
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce();
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
assertFalse(initPidResult.isCompleted());
prepareInitPidResponse(Errors.NONE, false, pid, epoch);
sender.runOnce(); // get pid and epoch
assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
assertTrue(transactionManager.hasProducerId());
assertEquals(pid, transactionManager.producerIdAndEpoch().producerId);
assertEquals(epoch, transactionManager.producerIdAndEpoch().epoch);
}
@Test
public void testLookupCoordinatorOnDisconnectBeforeSend() {
// This is called from the initTransactions method in the producer as the first order of business.
// It finds the coordinator and then gets a PID.
final long pid = 13131L;
final short epoch = 1;
TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // one loop to realize we need a coordinator.
sender.runOnce(); // next loop to find coordintor.
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
client.disconnect(brokerNode.idString());
client.blackout(brokerNode, 100);
// send pid to coordinator. Should get disconnected before the send and resend the FindCoordinator
// and InitPid requests.
sender.runOnce();
time.sleep(110); // waiting for the blackout period for the node to expire.
assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
assertFalse(initPidResult.isCompleted());
assertFalse(transactionManager.hasProducerId());
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce();
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
assertFalse(initPidResult.isCompleted());
prepareInitPidResponse(Errors.NONE, false, pid, epoch);
sender.runOnce(); // get pid and epoch
assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
assertTrue(transactionManager.hasProducerId());
assertEquals(pid, transactionManager.producerIdAndEpoch().producerId);
assertEquals(epoch, transactionManager.producerIdAndEpoch().epoch);
}
@Test
public void testLookupCoordinatorOnNotCoordinatorError() {
// This is called from the initTransactions method in the producer as the first order of business.
// It finds the coordinator and then gets a PID.
final long pid = 13131L;
final short epoch = 1;
TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // find coordinator
sender.runOnce();
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
prepareInitPidResponse(Errors.NOT_COORDINATOR, false, pid, epoch);
sender.runOnce(); // send pid, get not coordinator. Should resend the FindCoordinator and InitPid requests
assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
assertFalse(initPidResult.isCompleted());
assertFalse(transactionManager.hasProducerId());
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce();
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
assertFalse(initPidResult.isCompleted());
prepareInitPidResponse(Errors.NONE, false, pid, epoch);
sender.runOnce(); // get pid and epoch
assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
assertTrue(transactionManager.hasProducerId());
assertEquals(pid, transactionManager.producerIdAndEpoch().producerId);
assertEquals(epoch, transactionManager.producerIdAndEpoch().epoch);
}
@Test
public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false,
CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // find coordinator
sender.runOnce();
assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
sender.runOnce(); // one more run to fail the InitProducerId future
assertTrue(initPidResult.isCompleted());
assertFalse(initPidResult.isSuccessful());
assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException);
assertFatalError(TransactionalIdAuthorizationException.class);
}
@Test
public void testTransactionalIdAuthorizationFailureInInitProducerId() {
final long pid = 13131L;
TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // find coordinator
sender.runOnce();
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, pid, RecordBatch.NO_PRODUCER_EPOCH);
sender.runOnce();
assertTrue(transactionManager.hasError());
assertTrue(initPidResult.isCompleted());
assertFalse(initPidResult.isSuccessful());
assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException);
assertFatalError(TransactionalIdAuthorizationException.class);
}
@Test
public void testGroupAuthorizationFailureInFindCoordinator() {
final String consumerGroupId = "consumer";
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), consumerGroupId);
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
sender.runOnce(); // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
sender.runOnce(); // FindCoordinator Enqueued
prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, CoordinatorType.GROUP, consumerGroupId);
sender.runOnce(); // FindCoordinator Failed
sender.runOnce(); // TxnOffsetCommit Aborted
assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
assertTrue(sendOffsetsResult.error() instanceof GroupAuthorizationException);
GroupAuthorizationException exception = (GroupAuthorizationException) sendOffsetsResult.error();
assertEquals(consumerGroupId, exception.groupId());
assertAbortableError(GroupAuthorizationException.class);
}
@Test
public void testGroupAuthorizationFailureInTxnOffsetCommit() {
final String consumerGroupId = "consumer";
final long pid = 13131L;
final short epoch = 1;
final TopicPartition tp1 = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
singletonMap(tp1, new OffsetAndMetadata(39L)), consumerGroupId);
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
sender.runOnce(); // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
sender.runOnce(); // FindCoordinator Enqueued
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
sender.runOnce(); // FindCoordinator Returned
prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp1, Errors.GROUP_AUTHORIZATION_FAILED));
sender.runOnce(); // TxnOffsetCommit Handled
assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
assertTrue(sendOffsetsResult.error() instanceof GroupAuthorizationException);
assertFalse(transactionManager.hasPendingOffsetCommits());
GroupAuthorizationException exception = (GroupAuthorizationException) sendOffsetsResult.error();
assertEquals(consumerGroupId, exception.groupId());
assertAbortableError(GroupAuthorizationException.class);
}
@Test
public void testTransactionalIdAuthorizationFailureInAddOffsetsToTxn() {
final String consumerGroupId = "consumer";
final long pid = 13131L;
final short epoch = 1;
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
sender.runOnce(); // AddOffsetsToTxn Handled
assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
assertTrue(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException);
assertFatalError(TransactionalIdAuthorizationException.class);
}
@Test
public void testTransactionalIdAuthorizationFailureInTxnOffsetCommit() {
final String consumerGroupId = "consumer";
final long pid = 13131L;
final short epoch = 1;
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
sender.runOnce(); // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
sender.runOnce(); // FindCoordinator Enqueued
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
sender.runOnce(); // FindCoordinator Returned
prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
sender.runOnce(); // TxnOffsetCommit Handled
assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
assertTrue(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException);
assertFatalError(TransactionalIdAuthorizationException.class);
}
@Test
public void testTopicAuthorizationFailureInAddPartitions() {
final long pid = 13131L;
final short epoch = 1;
final TopicPartition tp0 = new TopicPartition("foo", 0);
final TopicPartition tp1 = new TopicPartition("bar", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
transactionManager.maybeAddPartitionToTransaction(tp1);
Map<TopicPartition, Errors> errors = new HashMap<>();
errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
prepareAddPartitionsToTxn(errors);
sender.runOnce();
assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException);
assertFalse(transactionManager.isPartitionPendingAdd(tp0));
assertFalse(transactionManager.isPartitionPendingAdd(tp1));
assertFalse(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.isPartitionAdded(tp1));
assertFalse(transactionManager.hasPartitionsToAdd());
TopicAuthorizationException exception = (TopicAuthorizationException) transactionManager.lastError();
assertEquals(singleton(tp0.topic()), exception.unauthorizedTopics());
assertAbortableError(TopicAuthorizationException.class);
}
@Test
public void testRecoveryFromAbortableErrorTransactionNotStarted() throws Exception {
final long pid = 13131L;
final short epoch = 1;
final TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> responseFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.runOnce();
assertTrue(transactionManager.hasAbortableError());
transactionManager.beginAbort();
sender.runOnce();
assertTrue(responseFuture.isDone());
assertFutureFailed(responseFuture);
// No partitions added, so no need to prepare EndTxn response
sender.runOnce();
assertTrue(transactionManager.isReady());
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasIncomplete());
// ensure we can now start a new transaction
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.hasPartitionsToAdd());
transactionManager.beginCommit();
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce();
assertTrue(responseFuture.isDone());
assertNotNull(responseFuture.get());
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.runOnce();
assertTrue(transactionManager.isReady());
}
@Test
public void testRecoveryFromAbortableErrorTransactionStarted() throws Exception {
final long pid = 13131L;
final short epoch = 1;
final TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.runOnce();
assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition));
assertFalse(authorizedTopicProduceFuture.isDone());
assertFalse(unauthorizedTopicProduceFuture.isDone());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
transactionManager.beginAbort();
sender.runOnce();
// neither produce request has been sent, so they should both be failed immediately
assertFutureFailed(authorizedTopicProduceFuture);
assertFutureFailed(unauthorizedTopicProduceFuture);
assertTrue(transactionManager.isReady());
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasIncomplete());
// ensure we can now start a new transaction
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.hasPartitionsToAdd());
transactionManager.beginCommit();
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce();
assertTrue(nextTransactionFuture.isDone());
assertNotNull(nextTransactionFuture.get());
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.runOnce();
assertTrue(transactionManager.isReady());
}
@Test
public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws Exception {
final long pid = 13131L;
final short epoch = 1;
final TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
accumulator.beginFlush();
prepareProduceResponse(Errors.REQUEST_TIMED_OUT, pid, epoch);
sender.runOnce();
assertFalse(authorizedTopicProduceFuture.isDone());
assertTrue(accumulator.hasIncomplete());
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.runOnce();
assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition));
assertFalse(authorizedTopicProduceFuture.isDone());
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce();
assertFutureFailed(unauthorizedTopicProduceFuture);
assertTrue(authorizedTopicProduceFuture.isDone());
assertNotNull(authorizedTopicProduceFuture.get());
assertTrue(authorizedTopicProduceFuture.isDone());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
transactionManager.beginAbort();
sender.runOnce();
// neither produce request has been sent, so they should both be failed immediately
assertTrue(transactionManager.isReady());
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasIncomplete());
// ensure we can now start a new transaction
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.hasPartitionsToAdd());
transactionManager.beginCommit();
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce();
assertTrue(nextTransactionFuture.isDone());
assertNotNull(nextTransactionFuture.get());
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.runOnce();
assertTrue(transactionManager.isReady());
}
@Test
public void testTransactionalIdAuthorizationFailureInAddPartitions() {
final long pid = 13131L;
final short epoch = 1;
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp);
prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
sender.runOnce();
assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
assertFatalError(TransactionalIdAuthorizationException.class);
}
@Test
public void testFlushPendingPartitionsOnCommit() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
TransactionalRequestResult commitResult = transactionManager.beginCommit();
// we have an append, an add partitions request, and now also an endtxn.
// The order should be:
// 1. Add Partitions
// 2. Produce
// 3. EndTxn.
assertFalse(transactionManager.transactionContainsPartition(tp0));
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
sender.runOnce(); // AddPartitions.
assertTrue(transactionManager.transactionContainsPartition(tp0));
assertFalse(responseFuture.isDone());
assertFalse(commitResult.isCompleted());
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce(); // Produce.
assertTrue(responseFuture.isDone());
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
assertFalse(commitResult.isCompleted());
assertTrue(transactionManager.hasOngoingTransaction());
assertTrue(transactionManager.isCompleting());
sender.runOnce();
assertTrue(commitResult.isCompleted());
assertFalse(transactionManager.hasOngoingTransaction());
}
@Test
public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
// User does one producer.sed
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
assertFalse(transactionManager.transactionContainsPartition(tp0));
// Sender flushes one add partitions. The produce goes next.
sender.runOnce(); // send addPartitions.
// Check that only addPartitions was sent.
assertTrue(transactionManager.transactionContainsPartition(tp0));
// In the mean time, the user does a second produce to a different partition
transactionManager.maybeAddPartitionToTransaction(tp1);
Future<RecordMetadata> secondResponseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxnResponse(Errors.NONE, tp1, epoch, pid);
prepareProduceResponse(Errors.NONE, pid, epoch);
assertFalse(transactionManager.transactionContainsPartition(tp1));
assertFalse(responseFuture.isDone());
assertFalse(secondResponseFuture.isDone());
// The second add partitions should go out here.
sender.runOnce(); // send second add partitions request
assertTrue(transactionManager.transactionContainsPartition(tp1));
assertFalse(responseFuture.isDone());
assertFalse(secondResponseFuture.isDone());
// Finally we get to the produce.
sender.runOnce(); // send produce request
assertTrue(responseFuture.isDone());
assertTrue(secondResponseFuture.isDone());
}
@Test
public void testProducerFencedException() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, pid, epoch);
sender.runOnce(); // Add partitions.
sender.runOnce(); // send produce.
assertTrue(responseFuture.isDone());
assertTrue(transactionManager.hasError());
try {
// make sure the produce was expired.
responseFuture.get();
fail("Expected to get a ExecutionException from the response");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof ProducerFencedException);
}
// make sure the exception was thrown directly from the follow-up calls.
assertThrows(ProducerFencedException.class, () -> transactionManager.beginTransaction());
assertThrows(ProducerFencedException.class, () -> transactionManager.beginCommit());
assertThrows(ProducerFencedException.class, () -> transactionManager.beginAbort());
assertThrows(ProducerFencedException.class, () -> transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), "dummyId"));
}
@Test
public void testDisallowCommitOnProduceFailure() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
TransactionalRequestResult commitResult = transactionManager.beginCommit();
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
sender.runOnce(); // Send AddPartitionsRequest
assertFalse(commitResult.isCompleted());
sender.runOnce(); // Send Produce Request, returns OutOfOrderSequenceException.
sender.runOnce(); // try to commit.
assertTrue(commitResult.isCompleted()); // commit should be cancelled with exception without being sent.
try {
commitResult.await();
fail(); // the get() must throw an exception.
} catch (KafkaException e) {
// Expected
}
try {
responseFuture.get();
fail("Expected produce future to raise an exception");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
}
// Commit is not allowed, so let's abort and try again.
TransactionalRequestResult abortResult = transactionManager.beginAbort();
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.runOnce(); // Send abort request. It is valid to transition from ERROR to ABORT
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
}
@Test
public void testAllowAbortOnProduceFailure() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.runOnce(); // Send AddPartitionsRequest
sender.runOnce(); // Send Produce Request, returns OutOfOrderSequenceException.
TransactionalRequestResult abortResult = transactionManager.beginAbort();
sender.runOnce(); // try to abort
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
}
@Test
public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
sender.runOnce(); // Send AddPartitionsRequest
sender.runOnce(); // Send Produce Request
TransactionalRequestResult abortResult = transactionManager.beginAbort();
assertTrue(transactionManager.isAborting());
assertFalse(transactionManager.hasError());
sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.runOnce(); // receive the produce response
// we do not transition to ABORTABLE_ERROR since we were already aborting
assertTrue(transactionManager.isAborting());
assertFalse(transactionManager.hasError());
sender.runOnce(); // handle the abort
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
}
@Test
public void testCommitTransactionWithUnsentProduceRequest() throws Exception {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(tp0, Errors.NONE);
sender.runOnce();
assertTrue(accumulator.hasUndrained());
// committing the transaction should cause the unsent batch to be flushed
transactionManager.beginCommit();
sender.runOnce();
assertFalse(accumulator.hasUndrained());
assertTrue(accumulator.hasIncomplete());
assertFalse(transactionManager.hasInFlightTransactionalRequest());
assertFalse(responseFuture.isDone());
// until the produce future returns, we will not send EndTxn
sender.runOnce();
assertFalse(accumulator.hasUndrained());
assertTrue(accumulator.hasIncomplete());
assertFalse(transactionManager.hasInFlightTransactionalRequest());
assertFalse(responseFuture.isDone());
// now the produce response returns
sendProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce();
assertTrue(responseFuture.isDone());
assertFalse(accumulator.hasUndrained());
assertFalse(accumulator.hasIncomplete());
assertFalse(transactionManager.hasInFlightTransactionalRequest());
// now we send EndTxn
sender.runOnce();
assertTrue(transactionManager.hasInFlightTransactionalRequest());
sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.runOnce();
assertFalse(transactionManager.hasInFlightTransactionalRequest());
assertTrue(transactionManager.isReady());
}
@Test
public void testCommitTransactionWithInFlightProduceRequest() throws Exception {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(tp0, Errors.NONE);
sender.runOnce();
assertTrue(accumulator.hasUndrained());
accumulator.beginFlush();
sender.runOnce();
assertFalse(accumulator.hasUndrained());
assertTrue(accumulator.hasIncomplete());
assertFalse(transactionManager.hasInFlightTransactionalRequest());
// now we begin the commit with the produce request still pending
transactionManager.beginCommit();
sender.runOnce();
assertFalse(accumulator.hasUndrained());
assertTrue(accumulator.hasIncomplete());
assertFalse(transactionManager.hasInFlightTransactionalRequest());
assertFalse(responseFuture.isDone());
// until the produce future returns, we will not send EndTxn
sender.runOnce();
assertFalse(accumulator.hasUndrained());
assertTrue(accumulator.hasIncomplete());
assertFalse(transactionManager.hasInFlightTransactionalRequest());
assertFalse(responseFuture.isDone());
// now the produce response returns
sendProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce();
assertTrue(responseFuture.isDone());
assertFalse(accumulator.hasUndrained());
assertFalse(accumulator.hasIncomplete());
assertFalse(transactionManager.hasInFlightTransactionalRequest());
// now we send EndTxn
sender.runOnce();
assertTrue(transactionManager.hasInFlightTransactionalRequest());
sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.runOnce();
assertFalse(transactionManager.hasInFlightTransactionalRequest());
assertTrue(transactionManager.isReady());
}
@Test
public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
sender.runOnce(); // Send AddPartitionsRequest
transactionManager.transitionToAbortableError(new KafkaException());
sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, tp0, epoch, pid);
sender.runOnce(); // AddPartitions returns
assertTrue(transactionManager.hasAbortableError());
assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // FindCoordinator handled
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
TransactionalRequestResult abortResult = transactionManager.beginAbort();
// note since no partitions were added to the transaction, no EndTxn will be sent
sender.runOnce(); // try to abort
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
try {
responseFuture.get();
fail("Expected produce future to raise an exception");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof KafkaException);
}
}
@Test
public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException {
final long producerId = 13131L;
final short producerEpoch = 1;
doInitTransactions(producerId, producerEpoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, producerEpoch, producerId);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
sender.runOnce(); // Send AddPartitions and let it fail
assertFalse(responseFuture.isDone());
TransactionalRequestResult abortResult = transactionManager.beginAbort();
// we should resend the AddPartitions
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch);
sender.runOnce(); // Resend AddPartitions
sender.runOnce(); // Send EndTxn
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
try {
responseFuture.get();
fail("Expected produce future to raise an exception");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof KafkaException);
}
}
@Test
public void testAbortResendsProduceRequestIfRetried() throws Exception {
final long producerId = 13131L;
final short producerEpoch = 1;
doInitTransactions(producerId, producerEpoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
prepareProduceResponse(Errors.REQUEST_TIMED_OUT, producerId, producerEpoch);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
sender.runOnce(); // Send AddPartitions
sender.runOnce(); // Send ProduceRequest and let it fail
assertFalse(responseFuture.isDone());
TransactionalRequestResult abortResult = transactionManager.beginAbort();
// we should resend the ProduceRequest before aborting
prepareProduceResponse(Errors.NONE, producerId, producerEpoch);
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch);
sender.runOnce(); // Resend ProduceRequest
sender.runOnce(); // Send EndTxn
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
RecordMetadata recordMetadata = responseFuture.get();
assertEquals(tp0.topic(), recordMetadata.topic());
}
@Test
public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch, pid);
sender.runOnce(); // Send AddPartitionsRequest
assertFalse(transactionManager.transactionContainsPartition(tp0)); // The partition should not yet be added.
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce(); // Send AddPartitionsRequest successfully.
assertTrue(transactionManager.transactionContainsPartition(tp0));
sender.runOnce(); // Send ProduceRequest.
assertTrue(responseFuture.isDone());
}
@Test
public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() {
testRetriableErrorInTxnOffsetCommit(Errors.UNKNOWN_TOPIC_OR_PARTITION);
}
@Test
public void testHandlingOfCoordinatorLoadingErrorOnTxnOffsetCommit() {
testRetriableErrorInTxnOffsetCommit(Errors.COORDINATOR_LOAD_IN_PROGRESS);
}
private void testRetriableErrorInTxnOffsetCommit(Errors error) {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp0, new OffsetAndMetadata(1));
offsets.put(tp1, new OffsetAndMetadata(1));
final String consumerGroupId = "myconsumergroup";
TransactionalRequestResult addOffsetsResult = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
sender.runOnce(); // send AddOffsetsToTxnResult
assertFalse(addOffsetsResult.isCompleted()); // The request should complete only after the TxnOffsetCommit completes.
Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
txnOffsetCommitResponse.put(tp0, Errors.NONE);
txnOffsetCommitResponse.put(tp1, error);
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
sender.runOnce(); // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator.
sender.runOnce(); // send find coordinator for group request
assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
assertTrue(transactionManager.hasPendingOffsetCommits());
sender.runOnce(); // send TxnOffsetCommitRequest request.
assertTrue(transactionManager.hasPendingOffsetCommits()); // The TxnOffsetCommit failed.
assertFalse(addOffsetsResult.isCompleted()); // We should only be done after both RPCs complete successfully.
txnOffsetCommitResponse.put(tp1, Errors.NONE);
prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
sender.runOnce(); // Send TxnOffsetCommitRequest again.
assertTrue(addOffsetsResult.isCompleted());
assertTrue(addOffsetsResult.isSuccessful());
}
@Test
public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
}
@Test
public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, tp0, epoch, pid);
sender.runOnce(); // Send AddPartitionsRequest
TransactionalRequestResult abortResult = transactionManager.beginAbort();
assertFalse(abortResult.isCompleted());
sender.runOnce();
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
}
@Test
public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp1, new OffsetAndMetadata(1));
final String consumerGroupId = "myconsumergroup";
transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
TransactionalRequestResult abortResult = transactionManager.beginAbort();
prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
sender.runOnce(); // Send AddOffsetsToTxnRequest
assertFalse(abortResult.isCompleted());
sender.runOnce();
assertTrue(transactionManager.isReady());
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
}
@Test
public void shouldFailAbortIfAddOffsetsFailsWithFatalError() {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp1, new OffsetAndMetadata(1));
final String consumerGroupId = "myconsumergroup";
transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
TransactionalRequestResult abortResult = transactionManager.beginAbort();
prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_SERVER_ERROR, consumerGroupId, pid, epoch);
sender.runOnce(); // Send AddOffsetsToTxnRequest
assertFalse(abortResult.isCompleted());
sender.runOnce();
assertTrue(abortResult.isCompleted());
assertFalse(abortResult.isSuccessful());
assertTrue(transactionManager.hasFatalError());
}
@Test
public void testNoDrainWhenPartitionsPending() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
transactionManager.maybeAddPartitionToTransaction(tp1);
accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
assertFalse(transactionManager.isSendToPartitionAllowed(tp1));
Node node1 = new Node(0, "localhost", 1111);
Node node2 = new Node(1, "localhost", 1112);
PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
PartitionInfo part2 = new PartitionInfo(topic, 1, node2, null, null);
Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2),
Collections.emptySet(), Collections.emptySet());
Set<Node> nodes = new HashSet<>();
nodes.add(node1);
nodes.add(node2);
Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, nodes, Integer.MAX_VALUE,
time.milliseconds());
// We shouldn't drain batches which haven't been added to the transaction yet.
assertTrue(drainedBatches.containsKey(node1.id()));
assertTrue(drainedBatches.get(node1.id()).isEmpty());
assertTrue(drainedBatches.containsKey(node2.id()));
assertTrue(drainedBatches.get(node2.id()).isEmpty());
assertFalse(transactionManager.hasError());
}
@Test
public void testAllowDrainInAbortableErrorState() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp1);
prepareAddPartitionsToTxn(tp1, Errors.NONE);
sender.runOnce(); // Send AddPartitions, tp1 should be in the transaction now.
assertTrue(transactionManager.transactionContainsPartition(tp1));
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxn(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
sender.runOnce(); // Send AddPartitions, should be in abortable state.
assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.isSendToPartitionAllowed(tp1));
// Try to drain a message destined for tp1, it should get drained.
Node node1 = new Node(1, "localhost", 1112);
PartitionInfo part1 = new PartitionInfo(topic, 1, node1, null, null);
Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1),
Collections.emptySet(), Collections.emptySet());
accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, Collections.singleton(node1),
Integer.MAX_VALUE,
time.milliseconds());
// We should drain the appended record since we are in abortable state and the partition has already been
// added to the transaction.
assertTrue(drainedBatches.containsKey(node1.id()));
assertEquals(1, drainedBatches.get(node1.id()).size());
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
// Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain.
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
Node node1 = new Node(0, "localhost", 1111);
PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1),
Collections.emptySet(), Collections.emptySet());
Set<Node> nodes = new HashSet<>();
nodes.add(node1);
Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, nodes, Integer.MAX_VALUE,
time.milliseconds());
// We shouldn't drain batches which haven't been added to the transaction yet.
assertTrue(drainedBatches.containsKey(node1.id()));
assertTrue(drainedBatches.get(node1.id()).isEmpty());
}
@Test
public void resendFailedProduceRequestAfterAbortableError() throws Exception {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
sender.runOnce(); // Add partitions
sender.runOnce(); // Produce
assertFalse(responseFuture.isDone());
transactionManager.transitionToAbortableError(new KafkaException());
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce();
assertTrue(responseFuture.isDone());
assertNotNull(responseFuture.get()); // should throw the exception which caused the transaction to be aborted.
}
@Test
public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
assertFalse(transactionManager.transactionContainsPartition(tp0));
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
sender.runOnce(); // send addPartitions.
// Check that only addPartitions was sent.
assertTrue(transactionManager.transactionContainsPartition(tp0));
assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
assertFalse(responseFuture.isDone());
// Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained.
time.sleep(10000);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
sender.runOnce(); // We should try to flush the produce, but expire it instead without sending anything.
assertTrue(responseFuture.isDone());
try {
// make sure the produce was expired.
responseFuture.get();
fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
transactionManager.maybeAddPartitionToTransaction(tp1);
Future<RecordMetadata> firstBatchResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> secondBatchResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(firstBatchResponse.isDone());
assertFalse(secondBatchResponse.isDone());
Map<TopicPartition, Errors> partitionErrors = new HashMap<>();
partitionErrors.put(tp0, Errors.NONE);
partitionErrors.put(tp1, Errors.NONE);
prepareAddPartitionsToTxn(partitionErrors);
assertFalse(transactionManager.transactionContainsPartition(tp0));
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
sender.runOnce(); // send addPartitions.
// Check that only addPartitions was sent.
assertTrue(transactionManager.transactionContainsPartition(tp0));
assertTrue(transactionManager.transactionContainsPartition(tp1));
assertTrue(transactionManager.isSendToPartitionAllowed(tp1));
assertTrue(transactionManager.isSendToPartitionAllowed(tp1));
assertFalse(firstBatchResponse.isDone());
assertFalse(secondBatchResponse.isDone());
// Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained.
time.sleep(10000);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
sender.runOnce(); // We should try to flush the produce, but expire it instead without sending anything.
assertTrue(firstBatchResponse.isDone());
assertTrue(secondBatchResponse.isDone());
try {
// make sure the produce was expired.
firstBatchResponse.get();
fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
try {
// make sure the produce was expired.
secondBatchResponse.get();
fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testDropCommitOnBatchExpiry() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
assertFalse(transactionManager.transactionContainsPartition(tp0));
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
sender.runOnce(); // send addPartitions.
// Check that only addPartitions was sent.
assertTrue(transactionManager.transactionContainsPartition(tp0));
assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
assertFalse(responseFuture.isDone());
TransactionalRequestResult commitResult = transactionManager.beginCommit();
// Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained.
time.sleep(10000);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
sender.runOnce(); // We should try to flush the produce, but expire it instead without sending anything.
assertTrue(responseFuture.isDone());
try {
// make sure the produce was expired.
responseFuture.get();
fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
sender.runOnce(); // the commit shouldn't be completed without being sent since the produce request failed.
assertTrue(commitResult.isCompleted());
assertFalse(commitResult.isSuccessful()); // the commit shouldn't succeed since the produce request failed.
assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.hasOngoingTransaction());
assertFalse(transactionManager.isCompleting());
assertTrue(transactionManager.transactionContainsPartition(tp0));
TransactionalRequestResult abortResult = transactionManager.beginAbort();
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.runOnce(); // send the abort.
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
assertFalse(transactionManager.hasOngoingTransaction());
assertFalse(transactionManager.transactionContainsPartition(tp0));
}
@Test
public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
assertFalse(transactionManager.transactionContainsPartition(tp0));
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
sender.runOnce(); // send addPartitions.
// Check that only addPartitions was sent.
assertTrue(transactionManager.transactionContainsPartition(tp0));
assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
sender.runOnce(); // send the produce request.
assertFalse(responseFuture.isDone());
TransactionalRequestResult commitResult = transactionManager.beginCommit();
// Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained.
time.sleep(10000);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
sender.runOnce(); // We should try to flush the produce, but expire it instead without sending anything.
assertTrue(responseFuture.isDone());
try {
// make sure the produce was expired.
responseFuture.get();
fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
sender.runOnce(); // Transition to fatal error since we have unresolved batches.
sender.runOnce(); // Fail the queued transactional requests
assertTrue(commitResult.isCompleted());
assertFalse(commitResult.isSuccessful()); // the commit should have been dropped.
assertTrue(transactionManager.hasFatalError());
assertFalse(transactionManager.hasOngoingTransaction());
}
@Test
public void testResetProducerIdAfterWithoutPendingInflightRequests() {
TransactionManager manager = new TransactionManager(logContext, null, transactionTimeoutMs,
DEFAULT_RETRY_BACKOFF_MS);
long producerId = 15L;
short epoch = 5;
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
manager.setProducerIdAndEpoch(producerIdAndEpoch);
// Nothing to resolve, so no reset is needed
manager.resetProducerIdIfNeeded();
assertEquals(producerIdAndEpoch, manager.producerIdAndEpoch());
TopicPartition tp0 = new TopicPartition("foo", 0);
assertEquals(Integer.valueOf(0), manager.sequenceNumber(tp0));
ProducerBatch b1 = writeIdempotentBatchWithValue(manager, tp0, "1");
assertEquals(Integer.valueOf(1), manager.sequenceNumber(tp0));
manager.handleCompletedBatch(b1, new ProduceResponse.PartitionResponse(
Errors.NONE, 500L, time.milliseconds(), 0L));
assertEquals(OptionalInt.of(0), manager.lastAckedSequence(tp0));
// Marking sequence numbers unresolved without inflight requests is basically a no-op.
manager.markSequenceUnresolved(tp0);
manager.resetProducerIdIfNeeded();
assertEquals(producerIdAndEpoch, manager.producerIdAndEpoch());
assertFalse(manager.hasUnresolvedSequences());
// We have a new batch which fails with a timeout
ProducerBatch b2 = writeIdempotentBatchWithValue(manager, tp0, "2");
assertEquals(Integer.valueOf(2), manager.sequenceNumber(tp0));
manager.markSequenceUnresolved(tp0);
manager.handleFailedBatch(b2, new TimeoutException(), false);
assertTrue(manager.hasUnresolvedSequences());
// We only had one inflight batch, so we should be able to clear the unresolved status
// and reset the producerId
manager.resetProducerIdIfNeeded();
assertFalse(manager.hasUnresolvedSequences());
assertFalse(manager.hasProducerId());
}
@Test
public void testNoProducerIdResetAfterLastInFlightBatchSucceeds() {
TransactionManager manager = new TransactionManager(logContext, null, transactionTimeoutMs,
DEFAULT_RETRY_BACKOFF_MS);
long producerId = 15L;
short epoch = 5;
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
manager.setProducerIdAndEpoch(producerIdAndEpoch);
TopicPartition tp0 = new TopicPartition("foo", 0);
ProducerBatch b1 = writeIdempotentBatchWithValue(manager, tp0, "1");
ProducerBatch b2 = writeIdempotentBatchWithValue(manager, tp0, "2");
ProducerBatch b3 = writeIdempotentBatchWithValue(manager, tp0, "3");
assertEquals(3, manager.sequenceNumber(tp0).intValue());
// The first batch fails with a timeout
manager.markSequenceUnresolved(tp0);
manager.handleFailedBatch(b1, new TimeoutException(), false);
assertTrue(manager.hasUnresolvedSequences());
// The reset should not occur until sequence numbers have been resolved
manager.resetProducerIdIfNeeded();
assertEquals(producerIdAndEpoch, manager.producerIdAndEpoch());
assertTrue(manager.hasUnresolvedSequences());
// The second batch fails as well with a timeout
manager.handleFailedBatch(b2, new TimeoutException(), false);
manager.resetProducerIdIfNeeded();
assertEquals(producerIdAndEpoch, manager.producerIdAndEpoch());
assertTrue(manager.hasUnresolvedSequences());
// The third batch succeeds, which should resolve the sequence number without
// requiring a producerId reset.
manager.handleCompletedBatch(b3, new ProduceResponse.PartitionResponse(
Errors.NONE, 500L, time.milliseconds(), 0L));
manager.resetProducerIdIfNeeded();
assertEquals(producerIdAndEpoch, manager.producerIdAndEpoch());
assertFalse(manager.hasUnresolvedSequences());
assertEquals(3, manager.sequenceNumber(tp0).intValue());
}
@Test
public void testProducerIdResetAfterLastInFlightBatchFails() {
TransactionManager manager = new TransactionManager(logContext, null, transactionTimeoutMs,
DEFAULT_RETRY_BACKOFF_MS);
long producerId = 15L;
short epoch = 5;
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
manager.setProducerIdAndEpoch(producerIdAndEpoch);
TopicPartition tp0 = new TopicPartition("foo", 0);
ProducerBatch b1 = writeIdempotentBatchWithValue(manager, tp0, "1");
ProducerBatch b2 = writeIdempotentBatchWithValue(manager, tp0, "2");
ProducerBatch b3 = writeIdempotentBatchWithValue(manager, tp0, "3");
assertEquals(Integer.valueOf(3), manager.sequenceNumber(tp0));
// The first batch fails with a timeout
manager.markSequenceUnresolved(tp0);
manager.handleFailedBatch(b1, new TimeoutException(), false);
assertTrue(manager.hasUnresolvedSequences());
// The second batch succeeds, but sequence numbers are still not resolved
manager.handleCompletedBatch(b2, new ProduceResponse.PartitionResponse(
Errors.NONE, 500L, time.milliseconds(), 0L));
manager.resetProducerIdIfNeeded();
assertEquals(producerIdAndEpoch, manager.producerIdAndEpoch());
assertTrue(manager.hasUnresolvedSequences());
// When the last inflight batch fails, we have to reset the producerId
manager.handleFailedBatch(b3, new TimeoutException(), false);
manager.resetProducerIdIfNeeded();
assertFalse(manager.hasProducerId());
assertFalse(manager.hasUnresolvedSequences());
assertEquals(0, manager.sequenceNumber(tp0).intValue());
}
@Test
public void testRetryAbortTransaction() throws InterruptedException {
verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.ABORT);
}
@Test
public void testRetryCommitTransaction() throws InterruptedException {
verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.COMMIT);
}
@Test(expected = KafkaException.class)
public void testRetryAbortTransactionAfterCommitTimeout() throws InterruptedException {
verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.ABORT);
}
@Test(expected = KafkaException.class)
public void testRetryCommitTransactionAfterAbortTimeout() throws InterruptedException {
verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.COMMIT);
}
private void verifyCommitOrAbortTranscationRetriable(TransactionResult firstTransactionResult,
TransactionResult retryTransactionResult)
throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.runOnce(); // send addPartitions.
sender.runOnce(); // send produce request.
TransactionalRequestResult result = firstTransactionResult == TransactionResult.COMMIT ?
transactionManager.beginCommit() : transactionManager.beginAbort();
prepareEndTxnResponse(Errors.NONE, firstTransactionResult, pid, epoch, true);
sender.runOnce();
assertFalse(result.isCompleted());
try {
result.await(MAX_BLOCK_TIMEOUT, TimeUnit.MILLISECONDS);
fail("Should have raised TimeoutException");
} catch (TimeoutException e) {
}
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce();
TransactionalRequestResult retryResult = retryTransactionResult == TransactionResult.COMMIT ?
transactionManager.beginCommit() : transactionManager.beginAbort();
assertEquals(retryResult, result); // check if cached result is reused.
prepareEndTxnResponse(Errors.NONE, retryTransactionResult, pid, epoch, false);
sender.runOnce();
assertTrue(retryResult.isCompleted());
assertFalse(transactionManager.hasOngoingTransaction());
}
private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {
final long pid = 1L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxn(tp0, error);
sender.runOnce(); // attempt send addPartitions.
assertTrue(transactionManager.hasError());
assertFalse(transactionManager.transactionContainsPartition(tp0));
}
private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors> errors) {
client.prepareResponse(body -> {
AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) body;
assertEquals(new HashSet<>(request.partitions()), new HashSet<>(errors.keySet()));
return true;
}, new AddPartitionsToTxnResponse(0, errors));
}
private void prepareAddPartitionsToTxn(final TopicPartition tp, final Errors error) {
prepareAddPartitionsToTxn(Collections.singletonMap(tp, error));
}
private void prepareFindCoordinatorResponse(Errors error, boolean shouldDisconnect,
final CoordinatorType coordinatorType,
final String coordinatorKey) {
client.prepareResponse(body -> {
FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), coordinatorType);
assertEquals(findCoordinatorRequest.data().key(), coordinatorKey);
return true;
}, FindCoordinatorResponse.prepareResponse(error, brokerNode), shouldDisconnect);
}
private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {
InitProducerIdResponseData responseData = new InitProducerIdResponseData()
.setErrorCode(error.code())
.setProducerEpoch(producerEpoch)
.setProducerId(producerId)
.setThrottleTimeMs(0);
client.prepareResponse(body -> {
InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId);
assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs);
return true;
}, new InitProducerIdResponse(responseData), shouldDisconnect);
}
private void sendProduceResponse(Errors error, final long producerId, final short producerEpoch) {
client.respond(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0));
}
private void prepareProduceResponse(Errors error, final long producerId, final short producerEpoch) {
client.prepareResponse(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0));
}
private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) {
return body -> {
ProduceRequest produceRequest = (ProduceRequest) body;
MemoryRecords records = produceRequest.partitionRecordsOrFail().get(tp0);
assertNotNull(records);
Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
assertTrue(batchIterator.hasNext());
MutableRecordBatch batch = batchIterator.next();
assertFalse(batchIterator.hasNext());
assertTrue(batch.isTransactional());
assertEquals(pid, batch.producerId());
assertEquals(epoch, batch.producerEpoch());
assertEquals(transactionalId, produceRequest.transactionalId());
return true;
};
}
private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
final short epoch, final long pid) {
client.prepareResponse(addPartitionsRequestMatcher(topicPartition, epoch, pid),
new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
}
private void sendAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
final short epoch, final long pid) {
client.respond(addPartitionsRequestMatcher(topicPartition, epoch, pid),
new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
}
private MockClient.RequestMatcher addPartitionsRequestMatcher(final TopicPartition topicPartition,
final short epoch, final long pid) {
return body -> {
AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) body;
assertEquals(pid, addPartitionsToTxnRequest.producerId());
assertEquals(epoch, addPartitionsToTxnRequest.producerEpoch());
assertEquals(singletonList(topicPartition), addPartitionsToTxnRequest.partitions());
assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
return true;
};
}
private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
this.prepareEndTxnResponse(error, result, pid, epoch, false);
}
private void prepareEndTxnResponse(Errors error,
final TransactionResult result,
final long pid,
final short epoch,
final boolean shouldDisconnect) {
client.prepareResponse(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error), shouldDisconnect);
}
private void sendEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
client.respond(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error));
}
private MockClient.RequestMatcher endTxnMatcher(final TransactionResult result, final long pid, final short epoch) {
return body -> {
EndTxnRequest endTxnRequest = (EndTxnRequest) body;
assertEquals(transactionalId, endTxnRequest.transactionalId());
assertEquals(pid, endTxnRequest.producerId());
assertEquals(epoch, endTxnRequest.producerEpoch());
assertEquals(result, endTxnRequest.command());
return true;
};
}
private void prepareAddOffsetsToTxnResponse(final Errors error,
final String consumerGroupId,
final long producerId,
final short producerEpoch) {
client.prepareResponse(body -> {
AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) body;
assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId());
assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId());
assertEquals(producerId, addOffsetsToTxnRequest.producerId());
assertEquals(producerEpoch, addOffsetsToTxnRequest.producerEpoch());
return true;
}, new AddOffsetsToTxnResponse(0, error));
}
private void prepareTxnOffsetCommitResponse(final String consumerGroupId,
final long producerId,
final short producerEpoch,
Map<TopicPartition, Errors> txnOffsetCommitResponse) {
client.prepareResponse(request -> {
TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) request;
assertEquals(consumerGroupId, txnOffsetCommitRequest.consumerGroupId());
assertEquals(producerId, txnOffsetCommitRequest.producerId());
assertEquals(producerEpoch, txnOffsetCommitRequest.producerEpoch());
return true;
}, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
}
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, 10);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(tp, resp);
return new ProduceResponse(partResp, throttleTimeMs);
}
private void doInitTransactions(long pid, short epoch) {
transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.runOnce(); // find coordinator
sender.runOnce();
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
prepareInitPidResponse(Errors.NONE, false, pid, epoch);
sender.runOnce(); // get pid.
assertTrue(transactionManager.hasProducerId());
}
private void assertAbortableError(Class<? extends RuntimeException> cause) {
try {
transactionManager.beginCommit();
fail("Should have raised " + cause.getSimpleName());
} catch (KafkaException e) {
assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
assertTrue(transactionManager.hasError());
}
assertTrue(transactionManager.hasError());
transactionManager.beginAbort();
assertFalse(transactionManager.hasError());
}
private void assertFatalError(Class<? extends RuntimeException> cause) {
assertTrue(transactionManager.hasError());
try {
transactionManager.beginAbort();
fail("Should have raised " + cause.getSimpleName());
} catch (KafkaException e) {
assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
assertTrue(transactionManager.hasError());
}
// Transaction abort cannot clear fatal error state
try {
transactionManager.beginAbort();
fail("Should have raised " + cause.getSimpleName());
} catch (KafkaException e) {
assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
assertTrue(transactionManager.hasError());
}
}
private void assertFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
assertTrue(future.isDone());
try {
future.get();
fail("Expected produce future to throw");
} catch (ExecutionException e) {
// expected
}
}
}