You can configure Go producers using a ProducerOptions object. Here's an example:
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
log.Fatal(err)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
For all available methods of Producer interface, see here.
Pulsar Go client registers client metrics using Prometheus. This section demonstrates how to create a simple Pulsar producer application that exposes Prometheus metrics via HTTP.
// Create a Pulsar client client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) if err != nil { log.Fatal(err) } defer client.Close() // Start a separate goroutine for Prometheus metrics // In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics go func() { prometheusPort := 2112 log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort) http.Handle("/metrics", promhttp.Handler()) err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil) if err != nil { log.Fatal(err) } }() // Create a producer producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "topic-1", }) if err != nil { log.Fatal(err) } defer producer.Close() ctx := context.Background() // Write your business logic here // In this case, you build a simple Web server. You can produce messages by requesting http://localhost:8082/produce webPort := 8082 http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) { msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello world")), }) if err != nil { log.Fatal(err) } else { log.Printf("Published message: %v", msgId) fmt.Fprintf(w, "Published message: %v", msgId) } }) err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil) if err != nil { log.Fatal(err) }
prometheus.yml).scrape_configs: - job_name: pulsar-client-go-metrics scrape_interval: 10s static_configs: - targets: - localhost:2112
Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions object.
Here's a basic example that uses channels:
consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "topic-1", SubscriptionName: "my-sub", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() for i := 0; i < 10; i++ { // may block here msg, err := consumer.Receive(context.Background()) if err != nil { log.Fatal(err) } fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } if err := consumer.Unsubscribe(); err != nil { log.Fatal(err) }
For all available methods of Consumer interface, see here.
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) if err != nil { log.Fatal(err) } defer client.Close() consumer, err := client.Subscribe(pulsar.ConsumerOptions{ // fill `Topic` field will create a single-topic consumer Topic: "topic-1", SubscriptionName: "my-sub", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close()
client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) defer client.Close() topicsPattern := "persistent://public/default/topic.*" opts := pulsar.ConsumerOptions{ // fill `TopicsPattern` field will create a regex consumer TopicsPattern: topicsPattern, SubscriptionName: "regex-sub", } consumer, err := client.Subscribe(opts) if err != nil { log.Fatal(err) } defer consumer.Close()
In this guide, This section demonstrates how to create a simple Pulsar consumer application that exposes Prometheus metrics via HTTP.
// Create a Pulsar client client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) if err != nil { log.Fatal(err) } defer client.Close() // Start a separate goroutine for Prometheus metrics // In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics go func() { prometheusPort := 2112 log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort) http.Handle("/metrics", promhttp.Handler()) err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil) if err != nil { log.Fatal(err) } }() // Create a consumer consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "topic-1", SubscriptionName: "sub-1", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() ctx := context.Background() // Write your business logic here // In this case, you build a simple Web server. You can consume messages by requesting http://localhost:8083/consume webPort := 8083 http.HandleFunc("/consume", func(w http.ResponseWriter, r *http.Request) { msg, err := consumer.Receive(ctx) if err != nil { log.Fatal(err) } else { log.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload())) fmt.Fprintf(w, "Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } }) err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil) if err != nil { log.Fatal(err) }
prometheus.yml).scrape_configs: - job_name: pulsar-client-go-metrics scrape_interval: 10s static_configs: - targets: - localhost: 2112
Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can configure Go readers using a ReaderOptions object. Here's an example:
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
For all available methods of the Reader interface, see here.