[Issue:53] Fix concurrent map write (#54)

* [Issue:53]Fix concurrent map write

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>

diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 7a6e414..d05c687 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -21,23 +21,15 @@
 
 # How to contribute
 
-If you would like to contribute code to this project you can do so through GitHub by forking the repository and sending a pull request.
+If you would like to contribute code to this project, fork the repository and send a pull request.
 
-This document outlines some of the conventions on development workflow, commit message formatting, contact points and other resources to make it easier to get your contribution accepted.
+## Prerequisite
 
-## Steps to Contribute
+If you have not installed Go, install it according to the [installation instruction](http://golang.org/doc/install).
 
-Since the `go mod` package management tool is used in this project, your go version is required at **Go1.11+**.
+Since the `go mod` package management tool is used in this project, **Go 1.11 or higher** version is required.
 
-### Fork
-
-Before you start contributing, you need to fork [pulsar-client-go](https://github.com/apache/pulsar) to your github repository.
-
-### Installation
-
-If you don't currently have a go environment installed,install Go according to the installation instructions here: http://golang.org/doc/install
-
-##### mac os && linux
+### Install Go on Mac OS and Linux
 
 ```bash
 $ mkdir -p $HOME/github.com/apache/
@@ -47,19 +39,21 @@
 $ go mod download
 ```
 
-When you execute `go mod download`, there may be some libs that cannot be downloaded. You can download them by referring to the proxy provided by [GOPROXY.io](https://goproxy.io/).
+If some libs cannot be downloaded when you enter the `go mod download` command, download them by referring to the proxy provided by [GOPROXY.io](https://goproxy.io/).
 
-### Contribution flow
+## Fork
+
+Before contributing, you need to fork [pulsar-client-go](https://github.com/apache/pulsar) to your github repository.
+
+## Contribution flow
 
 ```bash
 $ git remote add apache git@github.com:apache/pulsar-client-go.git
-
 // sync with remote master
 $ git checkout master
 $ git fetch apache
 $ git rebase apache/master
 $ git push origin master
-
 // create PR branch
 $ git checkout -b your_branch   
 # do your work, and then
@@ -68,19 +62,16 @@
 $ git push origin your_branch
 ```
 
-Thanks for your contributions!
+## Code style
 
-#### Code style
+The coding style suggested by the Golang community is used in Apache pulsar-client-go. For details, refer to [style doc](https://github.com/golang/go/wiki/CodeReviewComments).
+Follow the style, make your pull request easy to review, maintain and develop.
 
-The coding style suggested by the Golang community is used in Apache pulsar-client-go. See the [style doc](https://github.com/golang/go/wiki/CodeReviewComments) for details.
+## Create new files
 
-Please follow this style to make your pull request easy to review, maintain and develop.
+The project uses the open source protocol of Apache License 2.0. If you need to create a new file when developing new features, 
+add the license at the beginning of each file. The location of the header file: [header file](.header).
 
-#### Create new file
+## Update dependencies
 
-The project uses the open source protocol of Apache License 2.0. When you need to create a new file when developing new features, 
-please add it at the beginning of the file. The location of the header file: [header file](.header).
-
-#### Updating dependencies
-
-Apache `pulsar-client-go` uses [Go 1.11 module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a dependency: use the `go mod edit` command to change the dependency.
+Apache `pulsar-client-go` uses [Go 1.11 module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a dependency, use the `go mod edit` command to change the dependency.
\ No newline at end of file
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6fe86cd..7cc32ed 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -20,6 +20,7 @@
 import (
 	"context"
 	"fmt"
+	"github.com/apache/pulsar-client-go/util"
 	"github.com/stretchr/testify/assert"
 	"log"
 	"net/http"
@@ -361,7 +362,7 @@
 	topic := "persistent://public/default/testGetPartitions"
 	testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
 
-	makeHTTPCall(t, http.MethodPut, testURL, "5")
+	makeHTTPCall(t, http.MethodPut, testURL, "64")
 
 	// create producer
 	producer, err := client.CreateProducer(ProducerOptions{
@@ -635,3 +636,82 @@
 		})
 	}
 }
+
+func TestConsumer_Shared(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/testMultiPartitionConsumerShared"
+	testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"
+
+	makeHTTPCall(t, http.MethodPut, testURL, "3")
+
+	sub := "sub-shared-1"
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: sub,
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	consumer2, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: sub,
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer consumer2.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		if err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	msgList := make([]string, 0, 5)
+	for i := 0; i < 5; i++ {
+		msg, err := consumer1.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+		fmt.Printf("consumer1 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
+		msgList = append(msgList, string(msg.Payload()))
+		if err := consumer1.Ack(msg); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	assert.Equal(t, 5, len(msgList))
+
+	for i := 0; i < 5; i++ {
+		msg, err := consumer2.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+		if err := consumer2.Ack(msg); err != nil {
+			log.Fatal(err)
+		}
+		fmt.Printf("consumer2 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
+		msgList = append(msgList, string(msg.Payload()))
+	}
+
+	assert.Equal(t, 10, len(msgList))
+	res := util.RemoveDuplicateElement(msgList)
+	assert.Equal(t, 10, len(res))
+}
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 87cf68b..3a729cd 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -318,6 +318,7 @@
 				if err != nil {
 					return err
 				}
+				receivedSinceFlow = 0
 				continue
 			}
 			break
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 2c707ea..0d7d3d8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -382,13 +382,13 @@
 func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
 	c.mapMutex.RLock()
 	request, ok := c.pendingReqs[requestID]
-	c.mapMutex.RUnlock()
 	if !ok {
 		c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type)
 		return
 	}
 
 	delete(c.pendingReqs, requestID)
+	c.mapMutex.RUnlock()
 	request.callback(response)
 }