blob: 24e5c5aaf1e6d55e4963e9e1b7ecde3ca7d29c4e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
* Test for consuming transaction messages.
*/
@Slf4j
@Test(groups = "broker")
public class TransactionConsumeTest extends TransactionTestBase {
private static final String CONSUME_TOPIC = "persistent://public/txn/txn-consume-test";
private static final String NORMAL_MSG_CONTENT = "Normal - ";
private static final String TXN_MSG_CONTENT = "Txn - ";
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
setBrokerCount(1);
super.internalSetup();
String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace("public/txn", 10);
admin.topics().createNonPartitionedTopic(CONSUME_TOPIC);
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
}
@AfterMethod(alwaysRun = true)
protected void cleanup() {
super.internalCleanup();
}
@Test
public void noSortedTest() throws Exception {
int messageCntBeforeTxn = 10;
int transactionMessageCnt = 10;
int messageCntAfterTxn = 10;
int totalMsgCnt = messageCntBeforeTxn + transactionMessageCnt + messageCntAfterTxn;
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(CONSUME_TOPIC)
.create();
@Cleanup
Consumer<byte[]> exclusiveConsumer = pulsarClient.newConsumer()
.topic(CONSUME_TOPIC)
.subscriptionName("exclusive-test")
.subscribe();
@Cleanup
Consumer<byte[]> sharedConsumer = pulsarClient.newConsumer()
.topic(CONSUME_TOPIC)
.subscriptionName("shared-test")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Awaitility.await().until(exclusiveConsumer::isConnected);
Awaitility.await().until(sharedConsumer::isConnected);
long mostSigBits = 2L;
long leastSigBits = 5L;
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(CONSUME_TOPIC, false).get().get();
log.info("transactionBuffer init finish.");
List<String> sendMessageList = new ArrayList<>();
sendNormalMessages(producer, 0, messageCntBeforeTxn, sendMessageList);
appendTransactionMessages(txnID, persistentTopic, transactionMessageCnt, sendMessageList);
sendNormalMessages(producer, messageCntBeforeTxn, messageCntAfterTxn, sendMessageList);
Message<byte[]> message;
for (int i = 0; i < totalMsgCnt; i++) {
if (i < messageCntBeforeTxn) {
// receive normal messages successfully
message = exclusiveConsumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
log.info("Receive exclusive normal msg: {}" + new String(message.getData(), UTF_8));
message = sharedConsumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
log.info("Receive shared normal msg: {}" + new String(message.getData(), UTF_8));
} else {
// can't receive transaction messages before commit
message = exclusiveConsumer.receive(500, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
log.info("exclusive consumer can't receive message before commit.");
message = sharedConsumer.receive(500, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
log.info("shared consumer can't receive message before commit.");
}
}
persistentTopic.endTxn(txnID, TxnAction.COMMIT_VALUE, 0L).get();
log.info("Commit txn.");
// receive transaction messages successfully after commit
for (int i = 0; i < transactionMessageCnt + messageCntAfterTxn; i++) {
message = exclusiveConsumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
log.info("Receive txn exclusive id: {}, msg: {}", message.getMessageId(), new String(message.getData()));
message = sharedConsumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
log.info("Receive txn shared id: {}, msg: {}", message.getMessageId(), new String(message.getData()));
}
log.info("TransactionConsumeTest noSortedTest finish.");
}
@Test
public void sortedTest() throws Exception {
int messageCntBeforeTxn = 10;
int transactionMessageCnt = 10;
int messageCntAfterTxn = 10;
int totalMsgCnt = messageCntBeforeTxn + transactionMessageCnt + messageCntAfterTxn;
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(CONSUME_TOPIC)
.create();
@Cleanup
Consumer<byte[]> exclusiveConsumer = pulsarClient.newConsumer()
.topic(CONSUME_TOPIC)
.subscriptionName("exclusive-test")
.subscribe();
@Cleanup
Consumer<byte[]> sharedConsumer = pulsarClient.newConsumer()
.topic(CONSUME_TOPIC)
.subscriptionName("shared-test")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Awaitility.await().until(exclusiveConsumer::isConnected);
Awaitility.await().until(sharedConsumer::isConnected);
long mostSigBits = 2L;
long leastSigBits = 5L;
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(CONSUME_TOPIC, false).get().get();
List<String> sendMessageList = new ArrayList<>();
sendNormalMessages(producer, 0, messageCntBeforeTxn, sendMessageList);
// append messages to TB
appendTransactionMessages(txnID, persistentTopic, transactionMessageCnt, sendMessageList);
persistentTopic.endTxn(txnID, TxnAction.COMMIT_VALUE, 0L).get();
log.info("Commit txn.");
sendNormalMessages(producer, messageCntBeforeTxn, messageCntAfterTxn, sendMessageList);
Message<byte[]> message;
for (int i = 0; i < totalMsgCnt; i++) {
message = exclusiveConsumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertEquals(sendMessageList.get(i), new String(message.getData()));
log.info("Receive exclusive normal msg: {}, index: {}", new String(message.getData(), UTF_8), i);
message = sharedConsumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertEquals(sendMessageList.get(i), new String(message.getData()));
log.info("Receive shared normal msg: {}, index: {}", new String(message.getData(), UTF_8), i);
}
log.info("TransactionConsumeTest sortedTest finish.");
}
private void sendNormalMessages(Producer<byte[]> producer, int startMsgCnt,
int messageCnt, List<String> sendMessageList)
throws PulsarClientException {
for (int i = 0; i < messageCnt; i++) {
String msg = NORMAL_MSG_CONTENT + (startMsgCnt + i);
sendMessageList.add(msg);
producer.newMessage().value(msg.getBytes(UTF_8)).send();
}
}
private List<MessageIdData> appendTransactionMessages(
TxnID txnID, PersistentTopic topic, int transactionMsgCnt, List<String> sendMessageList)
throws ExecutionException, InterruptedException, PulsarClientException {
//Change the state of TB to Ready.
@Cleanup
Producer<String> producer = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
.enableTransaction(true).build()
.newProducer(Schema.STRING).topic(CONSUME_TOPIC).sendTimeout(0, TimeUnit.SECONDS).create();
List<MessageIdData> positionList = new ArrayList<>();
for (int i = 0; i < transactionMsgCnt; i++) {
final int j = i;
MessageMetadata metadata = new MessageMetadata()
.setProducerName("producerName")
.setSequenceId(i)
.setTxnidMostBits(txnID.getMostSigBits())
.setTxnidLeastBits(txnID.getLeastSigBits())
.setPublishTime(System.currentTimeMillis());
String msg = TXN_MSG_CONTENT + i;
sendMessageList.add(msg);
ByteBuf headerAndPayload = Commands.serializeMetadataAndPayload(
Commands.ChecksumType.Crc32c, metadata,
Unpooled.copiedBuffer(msg.getBytes(UTF_8)));
CompletableFuture<PositionImpl> completableFuture = new CompletableFuture<>();
topic.publishTxnMessage(txnID, headerAndPayload, new Topic.PublishContext() {
@Override
public String getProducerName() {
return "test";
}
public long getSequenceId() {
return j + 30;
}
/**
* Return the producer name for the original producer.
*
* For messages published locally, this will return the same local producer name, though in case of replicated
* messages, the original producer name will differ
*/
public String getOriginalProducerName() {
return "test";
}
public long getOriginalSequenceId() {
return j + 30;
}
public long getHighestSequenceId() {
return j + 30;
}
public long getOriginalHighestSequenceId() {
return j + 30;
}
public long getNumberOfMessages() {
return j + 30;
}
@Override
public void completed(Exception e, long ledgerId, long entryId) {
completableFuture.complete(PositionImpl.get(ledgerId, entryId));
}
});
positionList.add(new MessageIdData().setLedgerId(completableFuture.get()
.getLedgerId()).setEntryId(completableFuture.get().getEntryId()));
}
log.info("append messages to TB finish.");
return positionList;
}
}