blob: 6871f3835053c0646aa787cb0912f448df3884cc [file]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.txn;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.txn.impl.TransactionImpl;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TransactionImplTest {
private TransactionImpl transactionImpl;
private org.apache.pulsar.client.api.transaction.Transaction mockTransaction;
private List<Consumer<?>> mockConsumers;
private List<MessageId> messageIds;
@BeforeMethod
public void setUp() {
mockTransaction = mock(org.apache.pulsar.client.api.transaction.Transaction.class);
mockConsumers = new ArrayList<>();
messageIds = new ArrayList<>();
// Create two mock consumers and two message IDs
for (int i = 0; i < 2; i++) {
Consumer<?> mockConsumer = mock(Consumer.class);
MessageId messageId = mock(MessageId.class);
mockConsumers.add(mockConsumer);
messageIds.add(messageId);
}
transactionImpl = new TransactionImpl(mockTransaction);
}
@Test
public void testRecordMsg() {
// Record a message for a consumer
Consumer<?> consumer = mockConsumers.get(0);
MessageId messageId = messageIds.get(0);
transactionImpl.recordMsg(messageId, consumer);
// Verify the message is recorded for the consumer
assertTrue(transactionImpl.getReceivedMessages().get(consumer).contains(messageId));
}
@Test
public void testAckAllReceivedMsgsAsync() throws ExecutionException, InterruptedException {
// Record messages for different consumers
for (int i = 0; i < mockConsumers.size(); i++) {
Consumer<?> consumer = mockConsumers.get(i);
MessageId messageId = messageIds.get(i);
transactionImpl.recordMsg(messageId, consumer);
}
// Mock the acknowledgeAsync method for each consumer
for (Consumer<?> consumer : mockConsumers) {
when(consumer.acknowledgeAsync(anyList(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
}
// Call the ackAllReceivedMsgsAsync method
CompletableFuture<Void> future = transactionImpl.ackAllReceivedMsgsAsync();
future.get();
// Verify each consumer called the correct acknowledgeAsync method with the correct message
// IDs
for (int i = 0; i < mockConsumers.size(); i++) {
Consumer<?> consumer = mockConsumers.get(i);
MessageId messageId = messageIds.get(i);
verify(consumer).acknowledgeAsync(eq(List.of(messageId)), eq(mockTransaction));
}
}
@Test
public void testAckAllReceivedMsgs() throws ExecutionException, InterruptedException {
// Record messages for a consumer
Consumer<?> consumer = mockConsumers.get(0);
MessageId messageId = messageIds.get(0);
transactionImpl.recordMsg(messageId, consumer);
// Mock the acknowledgeAsync method for the consumer
when(consumer.acknowledgeAsync(anyList(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
// Call the ackAllReceivedMsgs method
transactionImpl.ackAllReceivedMsgs(consumer);
// Verify the consumer called the correct acknowledgeAsync method with the correct message
// IDs
verify(consumer).acknowledgeAsync(eq(List.of(messageId)), eq(mockTransaction));
// Verify the message Ids were removed from the transaction context after acked.
assertEquals(transactionImpl.getReceivedMessages().size(), 0);
}
@Test
public void testAckAllReceivedMsgsAll() throws ExecutionException, InterruptedException {
// Record messages for different consumers
for (int i = 0; i < mockConsumers.size(); i++) {
Consumer<?> consumer = mockConsumers.get(i);
MessageId messageId = messageIds.get(i);
transactionImpl.recordMsg(messageId, consumer);
}
// Mock the acknowledgeAsync method for each consumer
for (Consumer<?> consumer : mockConsumers) {
when(consumer.acknowledgeAsync(anyList(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
}
// Call the ackAllReceivedMsgs method
transactionImpl.ackAllReceivedMsgs();
// Verify each consumer called the correct acknowledgeAsync method with the correct message
// IDs
for (int i = 0; i < mockConsumers.size(); i++) {
Consumer<?> consumer = mockConsumers.get(i);
MessageId messageId = messageIds.get(i);
verify(consumer).acknowledgeAsync(eq(List.of(messageId)), eq(mockTransaction));
}
}
@Test
public void testAckAllReceivedMsgsAsyncAll() throws ExecutionException, InterruptedException {
// Record messages for different consumers
for (int i = 0; i < mockConsumers.size(); i++) {
Consumer<?> consumer = mockConsumers.get(i);
MessageId messageId = messageIds.get(i);
transactionImpl.recordMsg(messageId, consumer);
}
// Mock the acknowledgeAsync method for each consumer
for (Consumer<?> consumer : mockConsumers) {
when(consumer.acknowledgeAsync(anyList(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
}
// Call the ackAllReceivedMsgsAsync method
CompletableFuture<Void> future = transactionImpl.ackAllReceivedMsgsAsync();
future.get();
// Verify each consumer called the correct acknowledgeAsync method with the correct message
// IDs
for (int i = 0; i < mockConsumers.size(); i++) {
Consumer<?> consumer = mockConsumers.get(i);
MessageId messageId = messageIds.get(i);
verify(consumer).acknowledgeAsync(eq(List.of(messageId)), eq(mockTransaction));
}
}
@Test
public void testCommitAsync() throws ExecutionException, InterruptedException {
// Mock the commit method of the transaction
when(mockTransaction.commit()).thenReturn(CompletableFuture.completedFuture(null));
// Call the commitAsync method
CompletableFuture<Void> future = transactionImpl.commitAsync();
future.get();
// Verify the commit method was called
verify(mockTransaction).commit();
}
@Test
public void testAbortAsync() throws ExecutionException, InterruptedException {
// Mock the abort method of the transaction
when(mockTransaction.abort()).thenReturn(CompletableFuture.completedFuture(null));
// Call the abortAsync method
CompletableFuture<Void> future = transactionImpl.abortAsync();
future.get();
// Verify the abort method was called
verify(mockTransaction).abort();
}
@Test
public void testCommit() throws ExecutionException, InterruptedException {
// Mock the commit method of the transaction
when(mockTransaction.commit()).thenReturn(CompletableFuture.completedFuture(null));
// Call the commit method
transactionImpl.commit();
// Verify the commit method was called
verify(mockTransaction).commit();
}
@Test
public void testAbort() throws ExecutionException, InterruptedException {
// Mock the abort method of the transaction
when(mockTransaction.abort()).thenReturn(CompletableFuture.completedFuture(null));
// Call the abort method
transactionImpl.abort();
// Verify the abort method was called
verify(mockTransaction).abort();
}
@Test
public void testGetTxnID() {
// Mock the getTxnID method of the transaction
TxnID txnID = mock(TxnID.class);
when(mockTransaction.getTxnID()).thenReturn(txnID);
// Call the getTxnID method
assertEquals(txnID, transactionImpl.getTxnID());
}
@Test
public void testGetState() {
// Mock the getState method of the transaction
org.apache.pulsar.client.api.transaction.Transaction.State state =
org.apache.pulsar.client.api.transaction.Transaction.State.OPEN;
when(mockTransaction.getState()).thenReturn(state);
// Call the getState method
assertEquals(state, transactionImpl.getState());
}
}