[Issue:53] Fix concurrent map write (#54)
* [Issue:53]Fix concurrent map write
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 7a6e414..d05c687 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -21,23 +21,15 @@
# How to contribute
-If you would like to contribute code to this project you can do so through GitHub by forking the repository and sending a pull request.
+If you would like to contribute code to this project, fork the repository and send a pull request.
-This document outlines some of the conventions on development workflow, commit message formatting, contact points and other resources to make it easier to get your contribution accepted.
+## Prerequisite
-## Steps to Contribute
+If you have not installed Go, install it according to the [installation instruction](http://golang.org/doc/install).
-Since the `go mod` package management tool is used in this project, your go version is required at **Go1.11+**.
+Since the `go mod` package management tool is used in this project, **Go 1.11 or higher** version is required.
-### Fork
-
-Before you start contributing, you need to fork [pulsar-client-go](https://github.com/apache/pulsar) to your github repository.
-
-### Installation
-
-If you don't currently have a go environment installed,install Go according to the installation instructions here: http://golang.org/doc/install
-
-##### mac os && linux
+### Install Go on Mac OS and Linux
```bash
$ mkdir -p $HOME/github.com/apache/
@@ -47,19 +39,21 @@
$ go mod download
```
-When you execute `go mod download`, there may be some libs that cannot be downloaded. You can download them by referring to the proxy provided by [GOPROXY.io](https://goproxy.io/).
+If some libs cannot be downloaded when you enter the `go mod download` command, download them by referring to the proxy provided by [GOPROXY.io](https://goproxy.io/).
-### Contribution flow
+## Fork
+
+Before contributing, you need to fork [pulsar-client-go](https://github.com/apache/pulsar) to your github repository.
+
+## Contribution flow
```bash
$ git remote add apache git@github.com:apache/pulsar-client-go.git
-
// sync with remote master
$ git checkout master
$ git fetch apache
$ git rebase apache/master
$ git push origin master
-
// create PR branch
$ git checkout -b your_branch
# do your work, and then
@@ -68,19 +62,16 @@
$ git push origin your_branch
```
-Thanks for your contributions!
+## Code style
-#### Code style
+The coding style suggested by the Golang community is used in Apache pulsar-client-go. For details, refer to [style doc](https://github.com/golang/go/wiki/CodeReviewComments).
+Follow the style, make your pull request easy to review, maintain and develop.
-The coding style suggested by the Golang community is used in Apache pulsar-client-go. See the [style doc](https://github.com/golang/go/wiki/CodeReviewComments) for details.
+## Create new files
-Please follow this style to make your pull request easy to review, maintain and develop.
+The project uses the open source protocol of Apache License 2.0. If you need to create a new file when developing new features,
+add the license at the beginning of each file. The location of the header file: [header file](.header).
-#### Create new file
+## Update dependencies
-The project uses the open source protocol of Apache License 2.0. When you need to create a new file when developing new features,
-please add it at the beginning of the file. The location of the header file: [header file](.header).
-
-#### Updating dependencies
-
-Apache `pulsar-client-go` uses [Go 1.11 module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a dependency: use the `go mod edit` command to change the dependency.
+Apache `pulsar-client-go` uses [Go 1.11 module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a dependency, use the `go mod edit` command to change the dependency.
\ No newline at end of file
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6fe86cd..7cc32ed 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -20,6 +20,7 @@
import (
"context"
"fmt"
+ "github.com/apache/pulsar-client-go/util"
"github.com/stretchr/testify/assert"
"log"
"net/http"
@@ -361,7 +362,7 @@
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
- makeHTTPCall(t, http.MethodPut, testURL, "5")
+ makeHTTPCall(t, http.MethodPut, testURL, "64")
// create producer
producer, err := client.CreateProducer(ProducerOptions{
@@ -635,3 +636,82 @@
})
}
}
+
+func TestConsumer_Shared(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "persistent://public/default/testMultiPartitionConsumerShared"
+ testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"
+
+ makeHTTPCall(t, http.MethodPut, testURL, "3")
+
+ sub := "sub-shared-1"
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ msgList := make([]string, 0, 5)
+ for i := 0; i < 5; i++ {
+ msg, err := consumer1.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Printf("consumer1 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
+ msgList = append(msgList, string(msg.Payload()))
+ if err := consumer1.Ack(msg); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ assert.Equal(t, 5, len(msgList))
+
+ for i := 0; i < 5; i++ {
+ msg, err := consumer2.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+ if err := consumer2.Ack(msg); err != nil {
+ log.Fatal(err)
+ }
+ fmt.Printf("consumer2 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
+ msgList = append(msgList, string(msg.Payload()))
+ }
+
+ assert.Equal(t, 10, len(msgList))
+ res := util.RemoveDuplicateElement(msgList)
+ assert.Equal(t, 10, len(res))
+}
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 87cf68b..3a729cd 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -318,6 +318,7 @@
if err != nil {
return err
}
+ receivedSinceFlow = 0
continue
}
break
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 2c707ea..0d7d3d8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -382,13 +382,13 @@
func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
c.mapMutex.RLock()
request, ok := c.pendingReqs[requestID]
- c.mapMutex.RUnlock()
if !ok {
c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type)
return
}
delete(c.pendingReqs, requestID)
+ c.mapMutex.RUnlock()
request.callback(response)
}