blob: babb77f7854219e7e42d1f3561203fb998e5a4d5 [file] [log] [blame]
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package pulsar
/*
#include "c_go_pulsar.h"
*/
import "C"
import (
"context"
"errors"
"runtime"
"time"
"unsafe"
)
type createProducerCtx struct {
client *client
schema Schema
callback func(producer Producer, err error)
conf *C.pulsar_producer_configuration_t
}
//export pulsarCreateProducerCallbackProxy
func pulsarCreateProducerCallbackProxy(res C.pulsar_result, ptr *C.pulsar_producer_t, ctx unsafe.Pointer) {
producerCtx := restorePointer(ctx).(createProducerCtx)
C.pulsar_producer_configuration_free(producerCtx.conf)
if res != C.pulsar_result_Ok {
producerCtx.callback(nil, newError(res, "Failed to create Producer"))
} else {
p := &producer{client: producerCtx.client, schema: producerCtx.schema, ptr: ptr}
runtime.SetFinalizer(p, producerFinalizer)
producerCtx.callback(p, nil)
}
}
func createProducerAsync(client *client, schema Schema, options ProducerOptions, callback func(producer Producer, err error)) {
if options.Topic == "" {
go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required when creating producer"))
return
}
conf := C.pulsar_producer_configuration_create()
if options.Name != "" {
cName := C.CString(options.Name)
defer C.free(unsafe.Pointer(cName))
C.pulsar_producer_configuration_set_producer_name(conf, cName)
}
// If SendTimeout is 0, we'll leave the default configured value on C library
if options.SendTimeout > 0 {
timeoutMillis := options.SendTimeout.Nanoseconds() / int64(time.Millisecond)
C.pulsar_producer_configuration_set_send_timeout(conf, C.int(timeoutMillis))
} else if options.SendTimeout < 0 {
// Set infinite publish timeout, which is specified as 0 in C API
C.pulsar_producer_configuration_set_send_timeout(conf, C.int(0))
}
if options.MaxPendingMessages != 0 {
C.pulsar_producer_configuration_set_max_pending_messages(conf, C.int(options.MaxPendingMessages))
}
if options.MaxPendingMessagesAcrossPartitions != 0 {
C.pulsar_producer_configuration_set_max_pending_messages_across_partitions(conf, C.int(options.MaxPendingMessagesAcrossPartitions))
}
if options.BlockIfQueueFull {
C.pulsar_producer_configuration_set_block_if_queue_full(conf, cBool(options.BlockIfQueueFull))
}
switch options.MessageRoutingMode {
case RoundRobinDistribution:
C.pulsar_producer_configuration_set_partitions_routing_mode(conf, C.pulsar_RoundRobinDistribution)
case UseSinglePartition:
C.pulsar_producer_configuration_set_partitions_routing_mode(conf, C.pulsar_UseSinglePartition)
case CustomPartition:
C.pulsar_producer_configuration_set_partitions_routing_mode(conf, C.pulsar_CustomPartition)
}
switch options.HashingScheme {
case JavaStringHash:
C.pulsar_producer_configuration_set_hashing_scheme(conf, C.pulsar_JavaStringHash)
case Murmur3_32Hash:
C.pulsar_producer_configuration_set_hashing_scheme(conf, C.pulsar_Murmur3_32Hash)
case BoostHash:
C.pulsar_producer_configuration_set_hashing_scheme(conf, C.pulsar_BoostHash)
}
if options.CompressionType != NoCompression {
C.pulsar_producer_configuration_set_compression_type(conf, C.pulsar_compression_type(options.CompressionType))
}
if schema != nil && schema.GetSchemaInfo() != nil {
if schema.GetSchemaInfo().Type != NONE {
cName := C.CString(schema.GetSchemaInfo().Name)
cSchema := C.CString(schema.GetSchemaInfo().Schema)
properties := C.pulsar_string_map_create()
defer C.free(unsafe.Pointer(cName))
defer C.free(unsafe.Pointer(cSchema))
defer C.pulsar_string_map_free(properties)
for key, value := range schema.GetSchemaInfo().Properties {
cKey := C.CString(key)
cValue := C.CString(value)
C.pulsar_string_map_put(properties, cKey, cValue)
C.free(unsafe.Pointer(cKey))
C.free(unsafe.Pointer(cValue))
}
C.pulsar_producer_configuration_set_schema_info(conf, C.pulsar_schema_type(schema.GetSchemaInfo().Type),
cName, cSchema, properties)
} else {
cName := C.CString("BYTES")
cSchema := C.CString("")
properties := C.pulsar_string_map_create()
defer C.free(unsafe.Pointer(cName))
defer C.free(unsafe.Pointer(cSchema))
defer C.pulsar_string_map_free(properties)
for key, value := range schema.GetSchemaInfo().Properties {
cKey := C.CString(key)
cValue := C.CString(value)
C.pulsar_string_map_put(properties, cKey, cValue)
C.free(unsafe.Pointer(cKey))
C.free(unsafe.Pointer(cValue))
}
C.pulsar_producer_configuration_set_schema_info(conf, C.pulsar_schema_type(BYTES),
cName, cSchema, properties)
}
}
if options.MessageRouter != nil {
C._pulsar_producer_configuration_set_message_router(conf, savePointer(&options.MessageRouter))
}
C.pulsar_producer_configuration_set_batching_enabled(conf, cBool(options.Batching))
if options.BatchingMaxPublishDelay != 0 {
delayMillis := options.BatchingMaxPublishDelay.Nanoseconds() / int64(time.Millisecond)
C.pulsar_producer_configuration_set_batching_max_publish_delay_ms(conf, C.ulong(delayMillis))
}
if options.BatchingMaxMessages != 0 {
C.pulsar_producer_configuration_set_batching_max_messages(conf, C.uint(options.BatchingMaxMessages))
}
if options.Properties != nil {
for key, value := range options.Properties {
cKey := C.CString(key)
cValue := C.CString(value)
C.pulsar_producer_configuration_set_property(conf, cKey, cValue)
C.free(unsafe.Pointer(cKey))
C.free(unsafe.Pointer(cValue))
}
}
topicName := C.CString(options.Topic)
defer C.free(unsafe.Pointer(topicName))
C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
savePointer(createProducerCtx{client, schema, callback, conf}))
}
type topicMetadata struct {
numPartitions int
}
func (tm *topicMetadata) NumPartitions() int {
return tm.numPartitions
}
//export pulsarRouterCallbackProxy
func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic_metadata_t, ctx unsafe.Pointer) C.int {
router := restorePointerNoDelete(ctx).(*func(msg Message, metadata TopicMetadata) int)
partitionIdx := (*router)(&message{ptr: msg}, &topicMetadata{int(C.pulsar_topic_metadata_get_num_partitions(metadata))})
return C.int(partitionIdx)
}
/// Producer
type producer struct {
client *client
ptr *C.pulsar_producer_t
schema Schema
}
func producerFinalizer(p *producer) {
C.pulsar_producer_free(p.ptr)
}
func (p *producer) Topic() string {
return C.GoString(C.pulsar_producer_get_topic(p.ptr))
}
func (p *producer) Name() string {
return C.GoString(C.pulsar_producer_get_producer_name(p.ptr))
}
func (p *producer) Schema() Schema {
return p.schema
}
func (p *producer) LastSequenceID() int64 {
return int64(C.pulsar_producer_get_last_sequence_id(p.ptr))
}
func (p *producer) Send(ctx context.Context, msg ProducerMessage) error {
c := make(chan error, 1)
p.SendAsync(ctx, msg, func(msg ProducerMessage, err error) { c <- err; close(c) })
select {
case <-ctx.Done():
return ctx.Err()
case cm := <-c:
return cm
}
}
type msgID struct {
err error
id MessageID
}
func (p *producer) SendAndGetMsgID(ctx context.Context, msg ProducerMessage) (MessageID, error) {
c := make(chan msgID, 10)
p.SendAndGetMsgIDAsync(ctx, msg, func(id MessageID, err error) {
tmpMsgID := msgID{
err: err,
id: id,
}
c <- tmpMsgID
close(c)
})
select {
case <-ctx.Done():
return nil, ctx.Err()
case cm := <-c:
return cm.id, cm.err
}
}
type sendCallback struct {
message ProducerMessage
callback func(ProducerMessage, error)
}
type sendCallbackWithMsgID struct {
message ProducerMessage
callback func(MessageID, error)
}
//export pulsarProducerSendCallbackProxy
func pulsarProducerSendCallbackProxy(res C.pulsar_result, messageId *C.pulsar_message_id_t, ctx unsafe.Pointer) {
sendCallback := restorePointer(ctx).(sendCallback)
if res != C.pulsar_result_Ok {
sendCallback.callback(sendCallback.message, newError(res, "Failed to send message"))
} else {
sendCallback.callback(sendCallback.message, nil)
C.pulsar_message_id_free(messageId)
}
}
//export pulsarProducerSendCallbackProxyWithMsgID
func pulsarProducerSendCallbackProxyWithMsgID(res C.pulsar_result, messageId *C.pulsar_message_id_t, ctx unsafe.Pointer) {
sendCallback := restorePointer(ctx).(sendCallbackWithMsgID)
if res != C.pulsar_result_Ok {
sendCallback.callback(getMessageId(messageId), newError(res, "Failed to send message"))
} else {
sendCallback.callback(getMessageId(messageId), nil)
}
}
func (p *producer) SendAsync(ctx context.Context, msg ProducerMessage, callback func(ProducerMessage, error)) {
if p.schema != nil {
if msg.Value == nil {
callback(msg, errors.New("message value is nil, please check"))
return
}
payLoad, err := p.schema.Encode(msg.Value)
if err != nil {
callback(msg, errors.New("serialize message value error, please check"))
return
}
msg.Payload = payLoad
} else {
if msg.Value != nil {
callback(msg, errors.New("message value is set but no schema is provided, please check"))
return
}
}
cMsg := buildMessage(msg)
defer C.pulsar_message_free(cMsg)
C._pulsar_producer_send_async(p.ptr, cMsg, savePointer(sendCallback{message: msg, callback: callback}))
}
func (p *producer) SendAndGetMsgIDAsync(ctx context.Context, msg ProducerMessage, callback func(MessageID, error)) {
if p.schema != nil {
if msg.Value == nil {
callback(nil, errors.New("message value is nil, please check"))
return
}
payLoad, err := p.schema.Encode(msg.Value)
if err != nil {
callback(nil, errors.New("serialize message value error, please check"))
return
}
msg.Payload = payLoad
} else {
if msg.Value != nil {
callback(nil, errors.New("message value is set but no schema is provided, please check"))
return
}
}
cMsg := buildMessage(msg)
defer C.pulsar_message_free(cMsg)
C._pulsar_producer_send_async_msg_id(p.ptr, cMsg, savePointer(sendCallbackWithMsgID{message: msg, callback: callback}))
}
func (p *producer) Close() error {
c := make(chan error, 1)
p.CloseAsync(func(err error) { c <- err; close(c) })
return <-c
}
func (p *producer) CloseAsync(callback func(error)) {
C._pulsar_producer_close_async(p.ptr, savePointer(callback))
}
//export pulsarProducerCloseCallbackProxy
func pulsarProducerCloseCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
callback := restorePointer(ctx).(func(error))
if res != C.pulsar_result_Ok {
callback(newError(res, "Failed to close Producer"))
} else {
callback(nil)
}
}
func (p *producer) Flush() error {
f := make(chan error, 1)
p.FlushAsync(func(err error) {
f <- err
close(f)
})
return <-f
}
func (p *producer) FlushAsync(callback func(error)) {
C._pulsar_producer_flush_async(p.ptr, savePointer(callback))
}
//export pulsarProducerFlushCallbackProxy
func pulsarProducerFlushCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
callback := restorePointer(ctx).(func(error))
if res != C.pulsar_result_Ok {
callback(newError(res, "Failed to flush Producer"))
} else {
callback(nil)
}
}