support jwt and trusted cert for pulsar perf client (#428)
* support toke and trust cert for pulsar perf client
* remove mandatory check for certificate in command line validation
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 582c06d..7fb8aab 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -65,9 +65,7 @@
b, _ = json.MarshalIndent(consumeArgs, "", " ")
log.Info("Consumer config: ", string(b))
- client, err := pulsar.NewClient(pulsar.ClientOptions{
- URL: clientArgs.ServiceURL,
- })
+ client, err := NewClient()
if err != nil {
log.Fatal(err)
@@ -92,6 +90,7 @@
// Print stats of the consume rate
tick := time.NewTicker(10 * time.Second)
+ defer tick.Stop()
for {
select {
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index ba6e197..3ffa7c0 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -136,6 +136,7 @@
// Print stats of the publish rate and latencies
tick := time.NewTicker(10 * time.Second)
+ defer tick.Stop()
q := quantile.NewTargeted(0.50, 0.95, 0.99, 0.999, 1.0)
messagesPublished := 0
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index 8fc0e09..488a74f 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -20,6 +20,7 @@
import (
"context"
"fmt"
+ "io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
@@ -40,7 +41,9 @@
var PrometheusPort int
type ClientArgs struct {
- ServiceURL string
+ ServiceURL string
+ TokenFile string
+ TLSTrustCertFile string
}
var clientArgs ClientArgs
@@ -49,6 +52,20 @@
clientOpts := pulsar.ClientOptions{
URL: clientArgs.ServiceURL,
}
+
+ if clientArgs.TokenFile != "" {
+ // read JWT from the file
+ tokenBytes, err := ioutil.ReadFile(clientArgs.TokenFile)
+ if err != nil {
+ log.WithError(err).Errorf("failed to read Pulsar JWT from a file %s", clientArgs.TokenFile)
+ os.Exit(1)
+ }
+ clientOpts.Authentication = pulsar.NewAuthenticationToken(string(tokenBytes))
+ }
+
+ if clientArgs.TLSTrustCertFile != "" {
+ clientOpts.TLSTrustCertsFilePath = clientArgs.TLSTrustCertFile
+ }
return pulsar.NewClient(clientOpts)
}
@@ -78,6 +95,8 @@
flags.BoolVar(&flagDebug, "debug", false, "enable debug output")
flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
"pulsar://localhost:6650", "The Pulsar service URL")
+ flags.StringVar(&clientArgs.TokenFile, "token-file", "", "file path to the Pulsar JWT file")
+ flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "", "file path to the trusted certificate file")
rootCmd.AddCommand(newProducerCommand())
rootCmd.AddCommand(newConsumerCommand())