Add test for transactionCoordinatorClient
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index c1cb3d8..7885abe 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -47,6 +47,11 @@
log log.Logger
}
+func (c *multiTopicConsumer) AckIDWithTxn(id MessageID, transaction Transaction) error {
+ //TODO implement me
+ panic("implement me")
+}
+
func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
mtc := &multiTopicConsumer{
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index ed2ae1a..162438f 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -62,6 +62,11 @@
consumerName string
}
+func (c *regexConsumer) AckIDWithTxn(id MessageID, transaction Transaction) error {
+ //TODO implement me
+ panic("implement me")
+}
+
func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
rc := ®exConsumer{
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7fd1885..0617227 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -70,7 +70,6 @@
// Batch format
// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD]
// [METADATA_SIZE][METADATA][PAYLOAD]
-//
type MessageReader struct {
buffer Buffer
// true if we are parsing a batched message - set after parsing the message metadata
@@ -213,6 +212,17 @@
cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId)
case pb.BaseCommand_AUTH_RESPONSE:
cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
+ case pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST:
+ cmd.TcClientConnectRequest = msg.(*pb.CommandTcClientConnectRequest)
+ case pb.BaseCommand_NEW_TXN:
+ cmd.NewTxn = msg.(*pb.CommandNewTxn)
+ case pb.BaseCommand_ADD_PARTITION_TO_TXN:
+ cmd.AddPartitionToTxn = msg.(*pb.CommandAddPartitionToTxn)
+ case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN:
+ cmd.AddSubscriptionToTxn = msg.(*pb.CommandAddSubscriptionToTxn)
+ case pb.BaseCommand_END_TXN:
+ cmd.EndTxn = msg.(*pb.CommandEndTxn)
+
default:
panic(fmt.Sprintf("Missing command type: %v", cmdType))
}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 11c9d49..beb0a07 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -568,7 +568,16 @@
c.handlePing()
case pb.BaseCommand_PONG:
c.handlePong()
-
+ case pb.BaseCommand_TC_CLIENT_CONNECT_RESPONSE:
+ c.handleResponse(cmd.TcClientConnectResponse.GetRequestId(), cmd)
+ case pb.BaseCommand_NEW_TXN_RESPONSE:
+ c.handleResponse(cmd.NewTxnResponse.GetRequestId(), cmd)
+ case pb.BaseCommand_ADD_PARTITION_TO_TXN_RESPONSE:
+ c.handleResponse(cmd.AddPartitionToTxnResponse.GetRequestId(), cmd)
+ case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
+ c.handleResponse(cmd.AddSubscriptionToTxnResponse.GetRequestId(), cmd)
+ case pb.BaseCommand_END_TXN_RESPONSE:
+ c.handleResponse(cmd.EndTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
default:
@@ -646,6 +655,7 @@
c.pendingLock.Lock()
if req.id != nil {
c.pendingReqs[*req.id] = req
+ c.log.Info("Debug: put request %d type %s", *req.id, req.cmd.Type)
}
c.pendingLock.Unlock()
c.writeCommand(req.cmd)
@@ -713,6 +723,7 @@
defer c.pendingLock.Unlock()
request, ok := c.pendingReqs[requestID]
if ok {
+ c.log.Info("Debug: delete request %d type %s", requestID, request.cmd.Type)
delete(c.pendingReqs, requestID)
}
return request, ok
@@ -723,6 +734,7 @@
defer c.pendingLock.Unlock()
for id, req := range c.pendingReqs {
req.callback(nil, err)
+ c.log.Info("Debug: fail pending request %d type %s", req.id, req.cmd.Type)
delete(c.pendingReqs, id)
}
return true
diff --git a/pulsar/transactionCoordinatorClient.go b/pulsar/transactionCoordinatorClient.go
index 790b1a8..734d38a 100644
--- a/pulsar/transactionCoordinatorClient.go
+++ b/pulsar/transactionCoordinatorClient.go
@@ -46,6 +46,8 @@
return err
}
tc.tcNum = uint64(r.Partitions)
+ tc.cons = make([]internal.Connection, tc.tcNum)
+
//Get connections with all transaction_impl coordinators which is synchronized
for i := uint64(0); i < tc.tcNum; i++ {
err := tc.grabConn(i)
@@ -136,7 +138,6 @@
}
/*
-*
Register the subscription which acked messages with the transactionImpl.
And this can be used when ending the transactionImpl.
*/
@@ -187,7 +188,7 @@
func (tc *transactionCoordinatorClient) canSendRequest() (bool, error) {
if tc.blockIfReachMaxPendingOps {
- if tc.semaphore.Acquire(context.Background()) {
+ if !tc.semaphore.Acquire(context.Background()) {
return false, newError(UnknownError, "Failed to acquire semaphore")
}
} else {
diff --git a/pulsar/transaction_coordinator_client_test.go b/pulsar/transaction_coordinator_client_test.go
new file mode 100644
index 0000000..7e9dd00
--- /dev/null
+++ b/pulsar/transaction_coordinator_client_test.go
@@ -0,0 +1,44 @@
+package pulsar
+
+import (
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestNewTxn(t *testing.T) {
+ topic := "my-topic"
+ sub := "my-sub"
+
+ tc := createTcClient(t)
+ id, err := tc.newTransaction(time.Duration(300000000000))
+ err = tc.addSubscriptionToTxn(id, topic, sub)
+ err = tc.addPublishPartitionToTxn(id, []string{topic})
+ assert.Nil(t, err)
+ id2, err := tc.newTransaction(time.Duration(300000000000))
+ err = tc.endTxn(id2, pb.TxnAction_ABORT)
+ assert.Nil(t, err)
+}
+
+func Test(t *testing.T) {
+ assert.NotNil(t, TxnID{})
+ id := TxnID{}
+ println("[{}, {}] ", id.mostSigBits, id.leastSigBits)
+}
+
+/*
+*
+Create a transaction coordinator client to send request
+*/
+func createTcClient(t *testing.T) *transactionCoordinatorClient {
+ c, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ IsEnableTransaction: true,
+ })
+ assert.Nil(t, err)
+ tcClient := newTransactionCoordinatorClientImpl(c.(*client))
+ err = tcClient.start()
+ assert.Nil(t, err)
+ return tcClient
+}
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
new file mode 100644
index 0000000..3af38b3
--- /dev/null
+++ b/pulsar/transaction_test.go
@@ -0,0 +1,15 @@
+package pulsar
+
+/*
+Test1: receive and reproduce message
+1. Receive some messages from topic1
+2. resend the messages to topic2 with transactions
+3. Ack the messages with transactions
+4. Commit the transactions / Abort the transactions
+5. Receive message form topic2 and con not receive messages from topic1
+ / Can not receive message from topic2 and can receive messages from topic1
+Test2:
+
+
+
+*/