Add flag to disable forced topic creation. (#226)

This flag is needed for the regex consumer.
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ba7d24d..3a27def 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -42,11 +42,12 @@
 
 type consumer struct {
 	sync.Mutex
-	topic        string
-	client       *client
-	options      ConsumerOptions
-	consumers    []*partitionConsumer
-	consumerName string
+	topic                     string
+	client                    *client
+	options                   ConsumerOptions
+	consumers                 []*partitionConsumer
+	consumerName              string
+	disableForceTopicCreation bool
 
 	// channel used to deliver message to clients
 	messageCh chan ConsumerMessage
@@ -123,17 +124,18 @@
 }
 
 func newInternalConsumer(client *client, options ConsumerOptions, topic string,
-	messageCh chan ConsumerMessage, dlq *dlqRouter) (*consumer, error) {
+	messageCh chan ConsumerMessage, dlq *dlqRouter, disableForceTopicCreation bool) (*consumer, error) {
 
 	consumer := &consumer{
-		topic:     topic,
-		client:    client,
-		options:   options,
-		messageCh: messageCh,
-		closeCh:   make(chan struct{}),
-		errorCh:   make(chan error),
-		dlq:       dlq,
-		log:       log.WithField("topic", topic),
+		topic:                     topic,
+		client:                    client,
+		options:                   options,
+		disableForceTopicCreation: disableForceTopicCreation,
+		messageCh:                 messageCh,
+		closeCh:                   make(chan struct{}),
+		errorCh:                   make(chan error),
+		dlq:                       dlq,
+		log:                       log.WithField("topic", topic),
 	}
 
 	if options.Name != "" {
@@ -275,7 +277,7 @@
 
 func topicSubscribe(client *client, options ConsumerOptions, topic string,
 	messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
-	return newInternalConsumer(client, options, topic, messageCh, dlqRouter)
+	return newInternalConsumer(client, options, topic, messageCh, dlqRouter, false)
 }
 
 func (c *consumer) Subscription() string {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 33fc605..498f2ae 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -76,6 +76,7 @@
 	startMessageIDInclusive    bool
 	subscriptionMode           subscriptionMode
 	readCompacted              bool
+	disableForceTopicCreation  bool
 }
 
 type partitionConsumer struct {
@@ -748,6 +749,12 @@
 		cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
 	}
 
+	// force topic creation is enabled by default so
+	// we only need to set the flag when disabling it
+	if pc.options.disableForceTopicCreation {
+		cmdSubscribe.ForceTopicCreation = proto.Bool(false)
+	}
+
 	res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
 		pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
 
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 00cbb88..3d0aebe 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -361,7 +361,7 @@
 	for _, t := range topics {
 		go func(topic string) {
 			defer wg.Done()
-			c, err := newInternalConsumer(c, opts, topic, ch, dlq)
+			c, err := newInternalConsumer(c, opts, topic, ch, dlq, true)
 			consumerErrorCh <- consumerError{
 				err:      err,
 				topic:    topic,