Fix panic when scale down partitions (#601)

Signed-off-by: xiaolongran <xiaolongran@tencent.com>



### Motivation

When the program is running, if the business is forced to delete certain sub partitions, the following error message will be caused, that is, old_partitions is greater than new_partitions, it looks like it is doing scale down partitions, and the current code logic only deals with the scenario of scale up partitions , So if the user is forced to delete some sub partitions, the following error will be encountered:

```
level=info msg="[Changed number of partitions in topic]" new_partitions=1 old_partitions=20 topic="persistent://pulsar-xxxxxxx/xxxx/gxxxxxxxx"
```

```
panic: runtime error: index out of range [1] with length 1

goroutine 166288 [running]:
github.com/apache/pulsar-client-go/pulsar.(*producer).internalCreatePartitionsProducers(0xc0070aa6e0, 0x0, 0x0)
        github.com/apache/pulsar-client-go/pulsar/producer_impl.go:194 +0x785
github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery.func1(0xc004167cd0, 0xc00559f5c0, 0xc006af6dc0, 0xc0070aa6e0)
       github.com/apache/pulsar-client-go/pulsar/producer_impl.go:152 +0xce
created by github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery
       github.com/apache/pulsar-client-go/pulsar/producer_impl.go:144 +0xcd
```

### Modifications

Increase the processing logic of scale down partition

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ec7ad7d..78ae0d7 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,7 +28,6 @@
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 	"github.com/apache/pulsar-client-go/pulsar/log"
-	"github.com/pkg/errors"
 )
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -258,22 +257,16 @@
 
 	c.Lock()
 	defer c.Unlock()
+
 	oldConsumers := c.consumers
+	oldNumPartitions = len(oldConsumers)
 
 	if oldConsumers != nil {
-		oldNumPartitions = len(oldConsumers)
 		if oldNumPartitions == newNumPartitions {
 			c.log.Debug("Number of partitions in topic has not changed")
 			return nil
 		}
 
-		if oldNumPartitions > newNumPartitions {
-			c.log.WithField("old_partitions", oldNumPartitions).
-				WithField("new_partitions", newNumPartitions).
-				Error("Does not support scaling down operations on topic partitions")
-			return errors.New("Does not support scaling down operations on topic partitions")
-		}
-
 		c.log.WithField("old_partitions", oldNumPartitions).
 			WithField("new_partitions", newNumPartitions).
 			Info("Changed number of partitions in topic")
@@ -281,7 +274,9 @@
 
 	c.consumers = make([]*partitionConsumer, newNumPartitions)
 
-	if oldConsumers != nil {
+	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+	// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
+	if oldConsumers != nil && oldNumPartitions < newNumPartitions {
 		// Copy over the existing consumer instances
 		for i := 0; i < oldNumPartitions; i++ {
 			c.consumers[i] = oldConsumers[i]
@@ -297,12 +292,19 @@
 	receiverQueueSize := c.options.ReceiverQueueSize
 	metadata := c.options.Properties
 
+	startPartition := oldNumPartitions
 	partitionsToAdd := newNumPartitions - oldNumPartitions
+
+	if partitionsToAdd < 0 {
+		partitionsToAdd = newNumPartitions
+		startPartition = 0
+	}
+
 	var wg sync.WaitGroup
 	ch := make(chan ConsumerError, partitionsToAdd)
 	wg.Add(partitionsToAdd)
 
-	for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
+	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
 		partitionTopic := partitions[partitionIdx]
 
 		go func(idx int, pt string) {
@@ -366,7 +368,11 @@
 		return err
 	}
 
-	c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
+	if newNumPartitions < oldNumPartitions {
+		c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
+	} else {
+		c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
+	}
 	return nil
 }
 
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index adf9b14..20e8d3d 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -26,7 +26,6 @@
 
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	"github.com/apache/pulsar-client-go/pulsar/log"
-	"github.com/pkg/errors"
 )
 
 const (
@@ -175,21 +174,14 @@
 	defer p.Unlock()
 
 	oldProducers := p.producers
+	oldNumPartitions = len(oldProducers)
 
 	if oldProducers != nil {
-		oldNumPartitions = len(oldProducers)
 		if oldNumPartitions == newNumPartitions {
 			p.log.Debug("Number of partitions in topic has not changed")
 			return nil
 		}
 
-		if oldNumPartitions > newNumPartitions {
-			p.log.WithField("old_partitions", oldNumPartitions).
-				WithField("new_partitions", newNumPartitions).
-				Error("Does not support scaling down operations on topic partitions")
-			return errors.New("Does not support scaling down operations on topic partitions")
-		}
-
 		p.log.WithField("old_partitions", oldNumPartitions).
 			WithField("new_partitions", newNumPartitions).
 			Info("Changed number of partitions in topic")
@@ -198,7 +190,9 @@
 
 	p.producers = make([]Producer, newNumPartitions)
 
-	if oldProducers != nil {
+	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+	// we need to rebuild the cache of new producers, otherwise the array will be out of bounds.
+	if oldProducers != nil && oldNumPartitions < newNumPartitions {
 		// Copy over the existing consumer instances
 		for i := 0; i < oldNumPartitions; i++ {
 			p.producers[i] = oldProducers[i]
@@ -211,10 +205,15 @@
 		err       error
 	}
 
+	startPartition := oldNumPartitions
 	partitionsToAdd := newNumPartitions - oldNumPartitions
+	if partitionsToAdd < 0 {
+		partitionsToAdd = newNumPartitions
+		startPartition = 0
+	}
 	c := make(chan ProducerError, partitionsToAdd)
 
-	for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
+	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
 		partition := partitions[partitionIdx]
 
 		go func(partitionIdx int, partition string) {
@@ -248,7 +247,11 @@
 		return err
 	}
 
-	p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
+	if newNumPartitions < oldNumPartitions {
+		p.metrics.ProducersPartitions.Set(float64(newNumPartitions))
+	} else {
+		p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
+	}
 	atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
 	atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
 	return nil