blob: c47ed7dd7533ea7dbffedef52a709d93725302a8 [file] [log] [blame]
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package pulsar
/*
#include "c_go_pulsar.h"
*/
import "C"
import (
"context"
"runtime"
"time"
"unsafe"
)
type consumer struct {
schema Schema
client *client
ptr *C.pulsar_consumer_t
defaultChannel chan ConsumerMessage
}
func consumerFinalizer(c *consumer) {
if c.ptr != nil {
C.pulsar_consumer_free(c.ptr)
}
}
//export pulsarSubscribeCallbackProxy
func pulsarSubscribeCallbackProxy(res C.pulsar_result, ptr *C.pulsar_consumer_t, ctx unsafe.Pointer) {
cc := restorePointer(ctx).(*subscribeContext)
C.pulsar_consumer_configuration_free(cc.conf)
if res != C.pulsar_result_Ok {
cc.callback(nil, newError(res, "Failed to subscribe to topic"))
} else {
cc.consumer.ptr = ptr
cc.consumer.schema = cc.schema
runtime.SetFinalizer(cc.consumer, consumerFinalizer)
cc.callback(cc.consumer, nil)
}
}
type subscribeContext struct {
schema Schema
conf *C.pulsar_consumer_configuration_t
consumer *consumer
callback func(Consumer, error)
}
func subscribeAsync(client *client, options ConsumerOptions, schema Schema, callback func(Consumer, error)) {
if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
return
}
if options.SubscriptionName == "" {
go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "subscription name is required"))
return
}
conf := C.pulsar_consumer_configuration_create()
consumer := &consumer{client: client}
if options.MessageChannel == nil {
// If there is no message listener, set a default channel so that we can have receive to
// use that
consumer.defaultChannel = make(chan ConsumerMessage)
options.MessageChannel = consumer.defaultChannel
}
C._pulsar_consumer_configuration_set_message_listener(conf, savePointer(&consumerCallback{
consumer: consumer,
channel: options.MessageChannel,
}))
if options.AckTimeout != 0 {
timeoutMillis := options.AckTimeout.Nanoseconds() / int64(time.Millisecond)
C.pulsar_consumer_set_unacked_messages_timeout_ms(conf, C.uint64_t(timeoutMillis))
}
if options.NackRedeliveryDelay != nil {
delayMillis := options.NackRedeliveryDelay.Nanoseconds() / int64(time.Millisecond)
C.pulsar_configure_set_negative_ack_redelivery_delay_ms(conf, C.long(delayMillis))
}
if options.Type != Exclusive {
C.pulsar_consumer_configuration_set_consumer_type(conf, C.pulsar_consumer_type(options.Type))
}
if options.SubscriptionInitPos != Latest {
C.pulsar_consumer_set_subscription_initial_position(conf, C.initial_position(options.SubscriptionInitPos))
}
if schema != nil && schema.GetSchemaInfo() != nil {
if schema.GetSchemaInfo().Type != NONE {
cName := C.CString(schema.GetSchemaInfo().Name)
cSchema := C.CString(schema.GetSchemaInfo().Schema)
properties := C.pulsar_string_map_create()
defer C.free(unsafe.Pointer(cName))
defer C.free(unsafe.Pointer(cSchema))
defer C.pulsar_string_map_free(properties)
for key, value := range schema.GetSchemaInfo().Properties {
cKey := C.CString(key)
cValue := C.CString(value)
C.pulsar_string_map_put(properties, cKey, cValue)
C.free(unsafe.Pointer(cKey))
C.free(unsafe.Pointer(cValue))
}
C.pulsar_consumer_configuration_set_schema_info(conf, C.pulsar_schema_type(schema.GetSchemaInfo().Type),
cName, cSchema, properties)
} else {
cName := C.CString("BYTES")
cSchema := C.CString("")
properties := C.pulsar_string_map_create()
defer C.free(unsafe.Pointer(cName))
defer C.free(unsafe.Pointer(cSchema))
defer C.pulsar_string_map_free(properties)
for key, value := range schema.GetSchemaInfo().Properties {
cKey := C.CString(key)
cValue := C.CString(value)
C.pulsar_string_map_put(properties, cKey, cValue)
C.free(unsafe.Pointer(cKey))
C.free(unsafe.Pointer(cValue))
}
C.pulsar_consumer_configuration_set_schema_info(conf, C.pulsar_schema_type(BYTES),
cName, cSchema, properties)
}
}
// ReceiverQueueSize==0 means to use the default queue size
// -1 means to disable the consumer prefetching
if options.ReceiverQueueSize > 0 {
C.pulsar_consumer_configuration_set_receiver_queue_size(conf, C.int(options.ReceiverQueueSize))
} else if options.ReceiverQueueSize < 0 {
// In C++ client lib, 0 means disable prefetching
C.pulsar_consumer_configuration_set_receiver_queue_size(conf, C.int(0))
}
if options.MaxTotalReceiverQueueSizeAcrossPartitions != 0 {
C.pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(conf,
C.int(options.MaxTotalReceiverQueueSizeAcrossPartitions))
}
if options.Name != "" {
name := C.CString(options.Name)
defer C.free(unsafe.Pointer(name))
C.pulsar_consumer_set_consumer_name(conf, name)
}
if options.Properties != nil {
for key, value := range options.Properties {
cKey := C.CString(key)
cValue := C.CString(value)
C.pulsar_consumer_configuration_set_property(conf, cKey, cValue)
C.free(unsafe.Pointer(cKey))
C.free(unsafe.Pointer(cValue))
}
}
C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
subName := C.CString(options.SubscriptionName)
defer C.free(unsafe.Pointer(subName))
callbackPtr := savePointer(&subscribeContext{schema: schema, conf: conf, consumer: consumer, callback: callback})
if options.Topic != "" {
topic := C.CString(options.Topic)
defer C.free(unsafe.Pointer(topic))
C._pulsar_client_subscribe_async(client.ptr, topic, subName, conf, callbackPtr)
} else if options.Topics != nil {
cArray := C.malloc(C.size_t(len(options.Topics)) * C.size_t(unsafe.Sizeof(uintptr(0))))
// convert the C array to a Go Array so we can index it
a := (*[1<<30 - 1]*C.char)(cArray)
for idx, topic := range options.Topics {
a[idx] = C.CString(topic)
}
C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)),
subName, conf, callbackPtr)
for idx := range options.Topics {
C.free(unsafe.Pointer(a[idx]))
}
C.free(cArray)
} else if options.TopicsPattern != "" {
topicsPattern := C.CString(options.TopicsPattern)
defer C.free(unsafe.Pointer(topicsPattern))
C._pulsar_client_subscribe_pattern_async(client.ptr, topicsPattern, subName, conf, callbackPtr)
}
}
type consumerCallback struct {
consumer Consumer
channel chan ConsumerMessage
}
//export pulsarMessageListenerProxy
func pulsarMessageListenerProxy(cConsumer *C.pulsar_consumer_t, message *C.pulsar_message_t, ctx unsafe.Pointer) {
cc := restorePointerNoDelete(ctx).(*consumerCallback)
defer func() {
ex := recover()
if ex != nil {
// There was an error when sending channel (eg: already closed)
}
}()
cc.channel <- ConsumerMessage{cc.consumer, newMessageWrapper(cc.consumer.Schema(), message)}
}
//// Consumer
func (c *consumer) Schema() Schema {
return c.schema
}
func (c *consumer) Topic() string {
return C.GoString(C.pulsar_consumer_get_topic(c.ptr))
}
func (c *consumer) Subscription() string {
return C.GoString(C.pulsar_consumer_get_subscription_name(c.ptr))
}
func (c *consumer) Unsubscribe() error {
channel := make(chan error, 1)
c.UnsubscribeAsync(func(err error) {
channel <- err
close(channel)
})
return <-channel
}
func (c *consumer) UnsubscribeAsync(callback func(error)) {
C._pulsar_consumer_unsubscribe_async(c.ptr, savePointer(callback))
}
//export pulsarConsumerUnsubscribeCallbackProxy
func pulsarConsumerUnsubscribeCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
callback := restorePointer(ctx).(func(err error))
if res != C.pulsar_result_Ok {
go callback(newError(res, "Failed to unsubscribe consumer"))
} else {
go callback(nil)
}
}
func (c *consumer) Receive(ctx context.Context) (Message, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case cm := <-c.defaultChannel:
return cm.Message, nil
}
}
func (c *consumer) Ack(msg Message) error {
C.pulsar_consumer_acknowledge_async(c.ptr, msg.(*message).ptr, nil, nil)
return nil
}
func (c *consumer) AckID(msgId MessageID) error {
C.pulsar_consumer_acknowledge_async_id(c.ptr, msgId.(*messageID).ptr, nil, nil)
return nil
}
func (c *consumer) AckCumulative(msg Message) error {
C.pulsar_consumer_acknowledge_cumulative_async(c.ptr, msg.(*message).ptr, nil, nil)
return nil
}
func (c *consumer) AckCumulativeID(msgId MessageID) error {
C.pulsar_consumer_acknowledge_cumulative_async_id(c.ptr, msgId.(*messageID).ptr, nil, nil)
return nil
}
func (c *consumer) Nack(msg Message) error {
C.pulsar_consumer_negative_acknowledge(c.ptr, msg.(*message).ptr)
return nil
}
func (c *consumer) NackID(msgId MessageID) error {
C.pulsar_consumer_negative_acknowledge_id(c.ptr, msgId.(*messageID).ptr)
return nil
}
func (c *consumer) Close() error {
channel := make(chan error, 1)
c.CloseAsync(func(err error) { channel <- err; close(channel) })
return <-channel
}
func (c *consumer) CloseAsync(callback func(error)) {
if c.defaultChannel != nil {
close(c.defaultChannel)
}
C._pulsar_consumer_close_async(c.ptr, savePointer(callback))
}
//export pulsarConsumerCloseCallbackProxy
func pulsarConsumerCloseCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
callback := restorePointer(ctx).(func(err error))
if res != C.pulsar_result_Ok {
go callback(newError(res, "Failed to close Consumer"))
} else {
go callback(nil)
}
}
func (c *consumer) RedeliverUnackedMessages() {
C.pulsar_consumer_redeliver_unacknowledged_messages(c.ptr)
}
func (c *consumer) Seek(msgID MessageID) error {
channel := make(chan error, 1)
c.SeekAsync(msgID, func(err error) {
channel <- err
close(channel)
})
return <-channel
}
func (c *consumer) SeekAsync(msgID MessageID, callback func(error)) {
C._pulsar_consumer_seek_async(c.ptr, msgID.(*messageID).ptr, savePointer(callback))
}
//export pulsarConsumerSeekCallbackProxy
func pulsarConsumerSeekCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
callback := restorePointer(ctx).(func(err error))
if res != C.pulsar_result_Ok {
go callback(newError(res, "Failed to seek Consumer"))
} else {
go callback(nil)
}
}