Add test for transactionCoordinatorClient
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index c1cb3d8..7885abe 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -47,6 +47,11 @@
 	log log.Logger
 }
 
+func (c *multiTopicConsumer) AckIDWithTxn(id MessageID, transaction Transaction) error {
+	//TODO implement me
+	panic("implement me")
+}
+
 func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
 	messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
 	mtc := &multiTopicConsumer{
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index ed2ae1a..162438f 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -62,6 +62,11 @@
 	consumerName string
 }
 
+func (c *regexConsumer) AckIDWithTxn(id MessageID, transaction Transaction) error {
+	//TODO implement me
+	panic("implement me")
+}
+
 func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
 	msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
 	rc := &regexConsumer{
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7fd1885..0617227 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -70,7 +70,6 @@
 // Batch format
 // [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD]
 // [METADATA_SIZE][METADATA][PAYLOAD]
-//
 type MessageReader struct {
 	buffer Buffer
 	// true if we are parsing a batched message - set after parsing the message metadata
@@ -213,6 +212,17 @@
 		cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId)
 	case pb.BaseCommand_AUTH_RESPONSE:
 		cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
+	case pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST:
+		cmd.TcClientConnectRequest = msg.(*pb.CommandTcClientConnectRequest)
+	case pb.BaseCommand_NEW_TXN:
+		cmd.NewTxn = msg.(*pb.CommandNewTxn)
+	case pb.BaseCommand_ADD_PARTITION_TO_TXN:
+		cmd.AddPartitionToTxn = msg.(*pb.CommandAddPartitionToTxn)
+	case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN:
+		cmd.AddSubscriptionToTxn = msg.(*pb.CommandAddSubscriptionToTxn)
+	case pb.BaseCommand_END_TXN:
+		cmd.EndTxn = msg.(*pb.CommandEndTxn)
+
 	default:
 		panic(fmt.Sprintf("Missing command type: %v", cmdType))
 	}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 11c9d49..beb0a07 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -568,7 +568,16 @@
 		c.handlePing()
 	case pb.BaseCommand_PONG:
 		c.handlePong()
-
+	case pb.BaseCommand_TC_CLIENT_CONNECT_RESPONSE:
+		c.handleResponse(cmd.TcClientConnectResponse.GetRequestId(), cmd)
+	case pb.BaseCommand_NEW_TXN_RESPONSE:
+		c.handleResponse(cmd.NewTxnResponse.GetRequestId(), cmd)
+	case pb.BaseCommand_ADD_PARTITION_TO_TXN_RESPONSE:
+		c.handleResponse(cmd.AddPartitionToTxnResponse.GetRequestId(), cmd)
+	case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
+		c.handleResponse(cmd.AddSubscriptionToTxnResponse.GetRequestId(), cmd)
+	case pb.BaseCommand_END_TXN_RESPONSE:
+		c.handleResponse(cmd.EndTxnResponse.GetRequestId(), cmd)
 	case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
 
 	default:
@@ -646,6 +655,7 @@
 		c.pendingLock.Lock()
 		if req.id != nil {
 			c.pendingReqs[*req.id] = req
+			c.log.Info("Debug: put request %d type %s", *req.id, req.cmd.Type)
 		}
 		c.pendingLock.Unlock()
 		c.writeCommand(req.cmd)
@@ -713,6 +723,7 @@
 	defer c.pendingLock.Unlock()
 	request, ok := c.pendingReqs[requestID]
 	if ok {
+		c.log.Info("Debug: delete request %d type %s", requestID, request.cmd.Type)
 		delete(c.pendingReqs, requestID)
 	}
 	return request, ok
@@ -723,6 +734,7 @@
 	defer c.pendingLock.Unlock()
 	for id, req := range c.pendingReqs {
 		req.callback(nil, err)
+		c.log.Info("Debug: fail pending request %d type %s", req.id, req.cmd.Type)
 		delete(c.pendingReqs, id)
 	}
 	return true
diff --git a/pulsar/transactionCoordinatorClient.go b/pulsar/transactionCoordinatorClient.go
index 790b1a8..734d38a 100644
--- a/pulsar/transactionCoordinatorClient.go
+++ b/pulsar/transactionCoordinatorClient.go
@@ -46,6 +46,8 @@
 		return err
 	}
 	tc.tcNum = uint64(r.Partitions)
+	tc.cons = make([]internal.Connection, tc.tcNum)
+
 	//Get connections with all transaction_impl coordinators which is synchronized
 	for i := uint64(0); i < tc.tcNum; i++ {
 		err := tc.grabConn(i)
@@ -136,7 +138,6 @@
 }
 
 /*
-*
 Register the subscription which acked messages with the transactionImpl.
 And this can be used when ending the transactionImpl.
 */
@@ -187,7 +188,7 @@
 
 func (tc *transactionCoordinatorClient) canSendRequest() (bool, error) {
 	if tc.blockIfReachMaxPendingOps {
-		if tc.semaphore.Acquire(context.Background()) {
+		if !tc.semaphore.Acquire(context.Background()) {
 			return false, newError(UnknownError, "Failed to acquire semaphore")
 		}
 	} else {
diff --git a/pulsar/transaction_coordinator_client_test.go b/pulsar/transaction_coordinator_client_test.go
new file mode 100644
index 0000000..7e9dd00
--- /dev/null
+++ b/pulsar/transaction_coordinator_client_test.go
@@ -0,0 +1,44 @@
+package pulsar
+
+import (
+	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+	"github.com/stretchr/testify/assert"
+	"testing"
+	"time"
+)
+
+func TestNewTxn(t *testing.T) {
+	topic := "my-topic"
+	sub := "my-sub"
+
+	tc := createTcClient(t)
+	id, err := tc.newTransaction(time.Duration(300000000000))
+	err = tc.addSubscriptionToTxn(id, topic, sub)
+	err = tc.addPublishPartitionToTxn(id, []string{topic})
+	assert.Nil(t, err)
+	id2, err := tc.newTransaction(time.Duration(300000000000))
+	err = tc.endTxn(id2, pb.TxnAction_ABORT)
+	assert.Nil(t, err)
+}
+
+func Test(t *testing.T) {
+	assert.NotNil(t, TxnID{})
+	id := TxnID{}
+	println("[{}, {}] ", id.mostSigBits, id.leastSigBits)
+}
+
+/*
+*
+Create a transaction coordinator client to send request
+*/
+func createTcClient(t *testing.T) *transactionCoordinatorClient {
+	c, err := NewClient(ClientOptions{
+		URL:                 "pulsar://localhost:6650",
+		IsEnableTransaction: true,
+	})
+	assert.Nil(t, err)
+	tcClient := newTransactionCoordinatorClientImpl(c.(*client))
+	err = tcClient.start()
+	assert.Nil(t, err)
+	return tcClient
+}
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
new file mode 100644
index 0000000..3af38b3
--- /dev/null
+++ b/pulsar/transaction_test.go
@@ -0,0 +1,15 @@
+package pulsar
+
+/*
+Test1: receive and reproduce message
+1. Receive some messages from topic1
+2. resend the messages to topic2 with transactions
+3. Ack the messages with transactions
+4. Commit the transactions / Abort the transactions
+5. Receive message form topic2 and con not receive messages from topic1
+   / Can not receive message from topic2 and can receive messages from topic1
+Test2:
+
+
+
+*/