| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <pulsar/c/producer.h> |
| |
| #include "c_structs.h" |
| |
| const char *pulsar_producer_get_topic(pulsar_producer_t *producer) { |
| return producer->producer.getTopic().c_str(); |
| } |
| |
| const char *pulsar_producer_get_producer_name(pulsar_producer_t *producer) { |
| return producer->producer.getProducerName().c_str(); |
| } |
| |
| void pulsar_producer_free(pulsar_producer_t *producer) { delete producer; } |
| |
| pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t *msg) { |
| msg->message = msg->builder.build(); |
| return (pulsar_result)producer->producer.send(msg->message); |
| } |
| |
| static void handle_producer_send(pulsar::Result result, pulsar::MessageId messageId, |
| pulsar_send_callback callback, void *ctx) { |
| if (result == pulsar::ResultOk) { |
| pulsar_message_id_t *c_message_id = new pulsar_message_id_t; |
| c_message_id->messageId = messageId; |
| callback(pulsar_result_Ok, c_message_id, ctx); |
| } else { |
| callback((pulsar_result)result, NULL, ctx); |
| } |
| } |
| |
| void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg, |
| pulsar_send_callback callback, void *ctx) { |
| msg->message = msg->builder.build(); |
| producer->producer.sendAsync(msg->message, std::bind(&handle_producer_send, std::placeholders::_1, |
| std::placeholders::_2, callback, ctx)); |
| } |
| |
| int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) { |
| return producer->producer.getLastSequenceId(); |
| } |
| |
| pulsar_result pulsar_producer_close(pulsar_producer_t *producer) { |
| return (pulsar_result)producer->producer.close(); |
| } |
| |
| void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) { |
| producer->producer.closeAsync(std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); |
| } |
| |
| pulsar_result pulsar_producer_flush(pulsar_producer_t *producer) { |
| return (pulsar_result)producer->producer.flush(); |
| } |
| |
| void pulsar_producer_flush_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) { |
| producer->producer.flushAsync(std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); |
| } |
| |
| int pulsar_producer_is_connected(pulsar_producer_t *producer) { return producer->producer.isConnected(); } |