blob: a672a30271fa7eadea5ccce5a7bf7a32863d3afb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
_ "net/http/pprof"
log ""
// FlagProfile is a global flag
var FlagProfile bool
var flagDebug bool
var PrometheusPort int
type ClientArgs struct {
ServiceURL string
TokenFile string
TLSTrustCertFile string
MaxConnectionsPerBroker int
var clientArgs ClientArgs
func NewClient() (pulsar.Client, error) {
clientOpts := pulsar.ClientOptions{
URL: clientArgs.ServiceURL,
MaxConnectionsPerBroker: clientArgs.MaxConnectionsPerBroker,
if clientArgs.TokenFile != "" {
// read JWT from the file
tokenBytes, err := os.ReadFile(clientArgs.TokenFile)
if err != nil {
log.WithError(err).Errorf("failed to read Pulsar JWT from a file %s", clientArgs.TokenFile)
clientOpts.Authentication = pulsar.NewAuthenticationToken(string(tokenBytes))
if clientArgs.TLSTrustCertFile != "" {
clientOpts.TLSTrustCertsFilePath = clientArgs.TLSTrustCertFile
return pulsar.NewClient(clientOpts)
func initLogger(debug bool) {
FullTimestamp: true,
TimestampFormat: "15:04:05.000",
level := log.InfoLevel
if debug {
level = log.DebugLevel
func main() {
rootCmd := &cobra.Command{
PersistentPreRun: func(cmd *cobra.Command, args []string) {
Use: "pulsar-perf-go",
flags := rootCmd.PersistentFlags()
flags.BoolVar(&FlagProfile, "profile", false, "enable profiling")
flags.IntVar(&PrometheusPort, "metrics", 8000, "Port to use to export metrics for Prometheus. Use -1 to disable.")
flags.BoolVar(&flagDebug, "debug", false, "enable debug output")
flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
"pulsar://localhost:6650", "The Pulsar service URL")
flags.StringVar(&clientArgs.TokenFile, "token-file", "", "file path to the Pulsar JWT file")
flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "", "file path to the trusted certificate file")
flags.IntVarP(&clientArgs.MaxConnectionsPerBroker, "max-connections", "c", 1,
"Max connections to open to broker. Defaults to 1.")
if PrometheusPort > 0 {
go func() {
log.Info("Starting Prometheus metrics at http://localhost:", PrometheusPort, "/metrics")
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":"+strconv.Itoa(PrometheusPort), nil)
err := rootCmd.Execute()
if err != nil {
fmt.Fprintf(os.Stderr, "executing command error=%+v\n", err)
func stopCh() <-chan struct{} {
stop := make(chan struct{})
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
go func() {
return stop
func RunProfiling(stop <-chan struct{}) {
go func() {
if err := serveProfiling("", stop); err != nil && err != http.ErrServerClosed {
log.WithError(err).Error("Unable to start debug profiling server")
// use `http://addr/debug/pprof` to access the browser
// use `go tool pprof http://addr/debug/pprof/profile` to get pprof file(cpu info)
// use `go tool pprof http://addr/debug/pprof/heap` to get inuse_space file
func serveProfiling(addr string, stop <-chan struct{}) error {
s := http.Server{
Addr: addr,
Handler: http.DefaultServeMux,
go func() {
log.Infof("Shutting down pprof server")
fmt.Printf("Starting pprof server at: %s\n", addr)
fmt.Printf(" use `http://%s/debug/pprof` to access the browser\n", addr)
fmt.Printf(" use `go tool pprof http://%s/debug/pprof/profile` to get pprof file(cpu info)\n", addr)
fmt.Printf(" use `go tool pprof http://%s/debug/pprof/heap` to get inuse_space file\n", addr)
return s.ListenAndServe()