Scalable Topics: new typed C++ SDK (`pulsar::st`) — API definition (#598)

* Scalable Topics: typed C++ SDK public API (pulsar::st)

Header-only public API for the scalable-topics SDK under a new pulsar::st
namespace (PIP-460/468/483): client, producers, the three consumer modes,
transactions, schemas (reflect-cpp JSON/Avro and protobuf), and the
Expected<T>/Future<T> result types, plus examples under examples/st.

API definition only -- no lib/st implementation or C API yet. The new API
requires C++20; the rest of the client stays C++17.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* Fix CI: clang-format the st sources; make reflect-cpp optional

- Apply clang-format-11 to the new pulsar::st headers and examples (the
  Formatting Check uses clang-format 11; local 18 formats differently).
- examples/CMakeLists.txt: build the four dependency-free st samples
  unconditionally and add the reflect-cpp JSON sample only when reflectcpp
  is found (find_package CONFIG QUIET instead of REQUIRED), so configure no
  longer fails where reflect-cpp is absent (e.g. the CodeQL/Analyze job).
- vcpkg.json: drop the reflectcpp dependency for now; it returns with the
  lib/st implementation that actually exercises the JSON/Avro schemas.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* Fix CI: give st config-struct fields default member initializers

GCC's -Wmissing-field-initializers (-Wextra, and the build is -Werror) fires
on a partial designated-initializer such as
.deadLetterPolicy({.maxRedeliverCount = 5}) for every omitted member that
lacks a default member initializer. clang does not warn, so this was missed
locally. Give every optional field in the user-facing policy/ack/DLQ structs
an '= std::nullopt' NSDMI so designated-init of any subset is warning-clean.
Verified with gcc:13 -Wextra -Werror against all four st examples.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: decode returns Expected<T> and takes std::span<const char>

Addresses PR review feedback on the Schema decode signature. The SerDe seam
now takes a std::span<const char> instead of (const char*, size_t), and
returns Expected<T> instead of T -- so malformed bytes or an unset schema are
error values rather than a non-opt-in throw, consistent with the rest of the
API. Message<T>::value() returns Expected<T> accordingly.

- built-in numeric codecs report a short payload as ResultInvalidMessage;
- the reflect-cpp JSON/Avro SerDes map a parse failure to an Error instead of
  letting rfl's .value() throw;
- the protobuf SerDe now checks ParseFromArray's result;
- a custom SerDe may still return a plain T (infallible) -- it converts
  implicitly to Expected<T>.

encode keeps throwing on an unset schema (a configuration error). Examples
updated to check the decoded value. Verified with clang + gcc:13 (-Wextra
-Werror) and clang-format-11.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* Revert "st: decode returns Expected<T> and takes std::span<const char>"

This reverts commit 46d3f6d1fcbd63c031e8ce7ca1eb8ed48719c44c.

* st: byte-buffer SerDe seam + zero-copy BytesView

Per PR review on the Schema encode/decode signatures.

SerDe seam (Schema<T> + JSON/Avro/protobuf factories):
- encode writes into a caller-provided, reusable std::vector<std::byte>& (no
  per-message allocation) and returns Expected<void>;
- decode takes std::span<const std::byte> and returns Expected<T>, so malformed
  input / an unset schema are error values rather than throws;
- Bytes is now std::vector<std::byte>.

Client-facing API unchanged: Message<T>::value() still returns T (decode
failures are handled inside the SDK), Producer::send(const T&) and the examples
are as before; a rare encode error is stashed in the builder and surfaces from
send()/sendAsync().

Zero-copy bytes: new BytesView = std::span<const std::byte>. Schema<BytesView>
is the zero-copy counterpart of Schema<Bytes> -- Producer<BytesView> publishes
the caller's bytes without copying (the caller keeps them valid until the send
completes) and Message<BytesView>::value() returns a view into the message
buffer. OutgoingMessage carries an optional non-owning view.

Verified with clang + gcc:13 (-Wextra -Werror), clang-format-11, and a runtime
check that decode returns a view at the same address.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: return std::string_view from string accessors

Per PR review: the string accessors return views instead of owning
references/copies, so the lib/st impl is not forced to store a std::string
per field -- it can return a view into whatever it already holds.

- consumer/producer topic() / subscription() / consumerName() / name() and
  Message::topic() now return std::string_view (Message::topic() previously
  copied); the detail::*Core declarations they forward to return string_view too.
- Message::key() / producerName() / replicatedFrom() now return
  std::optional<std::string_view>.
- Error::message() stays const std::string& (an Error is usually a temporary, so
  auto-capturing a const ref copies safely whereas a view would dangle).

Returned views are valid while the source object (message / consumer / producer)
is alive. All within pulsar::st; the old API is untouched. Verified with clang +
gcc:13 (-Wextra -Werror), static_asserts on the return types, and clang-format-11.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: serialize MessageId/Checkpoint as bytes, not std::string

toByteArray() returns std::vector<std::byte> and fromByteArray() takes
std::span<const std::byte>, instead of std::string -- byte-correct and
consistent with Bytes/BytesView. The round-trip stays implicit: a
std::vector<std::byte> from toByteArray() converts to the span parameter.
Example updated. All within pulsar::st.

Verified with clang + gcc:13 (-Wextra -Werror) and clang-format-11.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: make property() a by-value sink (review item K1)

property(const std::string& k, const std::string& v) becomes
property(std::string k, std::string v) with insert_or_assign(std::move(k),
std::move(v)), across MessageBuilder, ProducerBuilder, and the three consumer
builders -- consistent with the other by-value-sink setters (topic /
subscriptionName / etc.).

Verified with clang + gcc:13 (-Wextra -Werror) and clang-format-11.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: fix await_suspend coroutine resume race (review B1)

await_suspend now returns bool and uses SharedState::addListenerOrReady, which
atomically registers the resume continuation or reports the result is already
available -- so the coroutine resumes via await_resume instead of being resumed
from inside await_suspend (which could run/destroy the awaiter before it
returns). Verified with a co_await runtime test on clang + gcc:13.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: receive() does not surface decode errors -- doc fix (review B2)

A message whose payload cannot be decoded is handled internally by the SDK and
never delivered, so decode is not a receive failure. Dropped it from the receive
failure lists on all three consumers and added a clarifying note.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: uppercase primitive schema names STRING/DOUBLE (review B4)

Match the existing client's canonical primitive names (lib/Schema.cc:
STRING/INT32/INT64/FLOAT/DOUBLE/BYTES); StringCodec/DoubleCodec used mixed-case
'String'/'Double'. The name is sent to the broker.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: reset encodeError_ on a successful encode (review B5)

MessageBuilder::value() now clears encodeError_ on success instead of leaving a
prior failure sticky, so a later successful value() doesn't surface a stale
error at send()/sendAsync().

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: model event time as std::optional<Timestamp> (review B3)

OutgoingMessage::eventTime and MessageCore::eventTime() are now
std::optional<Timestamp> instead of an int64 epoch-ms with a 0=unset sentinel,
so an event time of exactly the Unix epoch is no longer indistinguishable from
unset. The int64 epoch-ms is just the wire encoding (converted in lib/st);
MessageBuilder::eventTime and Message::eventTime() simplify accordingly.

Verified epoch != unset at runtime on clang + gcc:13.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: add CheckpointConsumer::consumerName() (review G2)

Parity with Stream/QueueConsumer -- the consumerName config field and builder
setter existed, but the getter did not, so the name could be set but not read.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: [[nodiscard]] on MessageId/Checkpoint serialization + sentinels (review P3)

toByteArray() / fromByteArray() / earliest() / latest() return values that must
not be silently discarded.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: document Message::properties() view lifetime (review P4)

It returns a reference into the message, like the other view-returning getters.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: model deliverAt as std::optional<Timestamp> too (review B3 follow-on)

OutgoingMessage::deliverAt is now std::optional<Timestamp> (was int64 epoch-ms
with 0=immediate), matching eventTime; deliverAfter/deliverAt set it directly,
and the now-unused toEpochMs helper is removed. Verified on clang + gcc:13.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: model publishTime as Timestamp, not int64 epoch-ms

MessageCore::publishTime() now returns Timestamp (was int64_t publishTimeMs());
Message::publishTime() forwards it directly. Consistent with the eventTime /
deliverAt Timestamp modeling; the int64 epoch-ms is just the wire encoding
(converted in lib/st).

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: producer review items P1, G3, Q1

- P1: rename Producer::name() -> producerName() (+ ProducerCore), consistent
  with Message::producerName() and the producerName builder setter.
- G3: add MessageBuilder::replicationClusters() setter for the previously
  unreachable OutgoingMessage::replicationClusters field.
- Q1: drop the 'ordering key' framing from the message-key docs -- it is a
  routing / partition key; ordering is provided by the StreamConsumer.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: add float/int8/int16 primitive codecs (review G4/Q4)

FloatCodec (FLOAT, big-endian IEEE-754), Int8Codec (INT8), Int16Codec (INT16),
wired into the default Schema<T> ctor; canonical uppercase names match the
existing client. Round-trip verified on clang + gcc:13.

bool is NOT added: the existing pulsar::SchemaType enum has no BOOLEAN value
(Java has it at 5; the C++ port skipped it), and adding it would mean touching
the old API. Deferred pending a decision on extending the old enum.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: Message::data() returns BytesView, not const char* + size()

MessageCore::data() / Message<T>::data() now return std::span<const std::byte>
(BytesView), carrying pointer and length together; the separate size() accessor
is removed (use data().size()), and Message<T>::value() simplifies accordingly.
Consistent with the Bytes/BytesView byte modeling. Verified on clang + gcc:13.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: group loose client settings into policies by scope (review Q3)

PulsarClientBuilder drops the top-level ioThreads / messageListenerThreads /
memoryLimit / listenerName setters. Grouped by scope:
- listenerName -> ConnectionPolicy
- ioThreads + messageListenerThreads -> new ThreadPolicy
- memoryLimit -> new MemoryPolicy
with threadPolicy() / memoryPolicy() builder setters. Verified on clang + gcc:13.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: MessageCore optional accessors, drop hasX() bools

MessageCore::key() / producerName() / replicatedFrom() now return
std::optional<std::string_view> directly instead of a paired hasX() bool +
string_view accessor -- the optional carries the present/absent signal.
Message<T>'s wrappers collapse to direct forwards. Verified on clang + gcc:13.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: receive cores take std::chrono::milliseconds, not int64_t timeoutMs

detail::*Core receiveAsync/receiveMultiAsync now take std::chrono::milliseconds
(matching the public receive() signatures), so the public methods forward the
typed timeout directly instead of calling .count(). <cstdint> swapped for
<chrono> in the cores (int64_t was only the timeout). Verified on clang + gcc:13.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: OutgoingMessage key -> std::optional<std::string>

Replace the bool hasKey + std::string key pair on OutgoingMessage with a
single std::optional<std::string> key, mirroring the read-side
MessageCore::key() -> std::optional<std::string_view>. nullopt means no
routing key. MessageBuilder::key() now just assigns the optional.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: OutgoingMessage sequenceId -> std::optional<int64_t>

Drop the -1 sentinel on OutgoingMessage::sequenceId in favor of
std::optional<int64_t>; unset means auto-assign. Avoids a custom in-band
encoding of 'no explicit sequence id'. MessageBuilder::sequenceId() just
assigns the optional.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: Producer::lastSequenceId() -> std::optional<int64_t>

Drop the -1 sentinel on the read side too: lastSequenceId() now returns
std::nullopt when nothing has been published yet, instead of -1. Updates
detail::ProducerCore to match.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P2 - guard rfl encode() against throwing

rfl::{json,avro}::write() can throw, which would escape encode()'s
Expected<void> non-throwing contract. Wrap the body in try/catch and
report failures as unexpected(ResultInvalidMessage, ...), mirroring the
existing decode() guard. info() (schema derivation) has no error channel
and stays off the non-throwing path; document that on the factories.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P5 - warn about fire-and-forget + BytesView dangling

A zero-copy Schema<BytesView> send publishes the viewed bytes directly, so
they must outlive the send. The returned future is the only completion
signal; discarding it (fire-and-forget) leaves no safe point to free the
bytes. Document this on sendAsync().

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P6 - std::hash<MessageId> + Checkpoint operator<<

MessageId could not be used as a key in unordered_map/unordered_set. Add a
std::hash<MessageId> specialization (operator() defined in lib/st, consistent
with operator==: equal ids hash equal); befriend it so it can read the impl.

Give Checkpoint a hidden-friend operator<< mirroring MessageId's, so both
opaque position types stream the same way for logging/debugging.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P7 - explicitly default copy/move on handles

PulsarClient, Producer, the three consumers, and Transaction are
shared-state handles that must stay cheaply copyable and movable. They
relied on implicitly-generated special members, which a later user-declared
destructor would silently suppress (turning the move into a copy or deleting
it). Declare copy/move = default explicitly on all six to lock in handle
value semantics and make the intent visible.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P8 - thenApply supports void-returning and move-only mappers

thenApply assumed a non-void, copyable mapper: it called setValue(f(...))
(ill-formed when f returns void) and moved f straight into the std::function
listener (ill-formed when f is move-only, since std::function requires a
copyable target).

Branch on the result type with if constexpr - a void mapper runs and then
completes the Future<void> via setSuccess() - and hold f in a shared_ptr so
the copyable listener can carry a move-only mapper. Verified at runtime
(normal, void, move-only, and error-propagation paths) on clang and gcc.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P9 - fail the future when a Promise is abandoned

A detail::Promise dropped without being completed left its SharedState
forever pending, so Future::get() (and listeners / co_await) blocked
indefinitely. Add a Guard shared by every copy of a Promise: when the last
copy is destroyed it completes the state with an error (ResultUnknownError,
"promise abandoned before completion") unless something already fulfilled
it. complete() is idempotent, so a normally-completed promise is unaffected,
and destroying one copy among several does not trip it. Verified at runtime
(single/copied/void abandonment, partial-copy safety, completed no-op) on
clang and gcc.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P10 - note negativeAckRedeliveryDelay is inert on StreamConsumer

AckPolicy::negativeAckRedeliveryDelay only applies to a QueueConsumer. A
StreamConsumer acknowledges cumulatively and has no negative-ack path, so
the field is silently ignored there. Document that on the StreamConsumer
config field and the ackPolicy() setter.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P11 - document the invalid/rejected default consumer target

The topic-vs-namespace target is a bool + two strings, so the POD config can
represent invalid combinations the type system does not prevent - including
the default-constructed value (single-topic mode with an empty topic).
Document that such states (no target, or missing subscriptionName) are
rejected by create()/createAsync() with an Error, and that fields not
selected by useNamespace are ignored. (A variant target could make these
unrepresentable, but that diverges from the POD-config + designated-init
pattern used across the API.)

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P12 - rename ClientCore::createCheckpointAsync -> createCheckpointConsumerAsync

Match the create<Thing>Async naming of its siblings (createProducerAsync)
and the CheckpointConsumer type it returns. Internal detail rename; no public
API change.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: P13 - rvalue overloads for Expected monadic ops and value_or

value_or, and_then, transform, and or_else were const&-only: they copied the
contained value into the continuation, and value_or/and_then/transform would
not even compile for a move-only T. Add &&-qualified overloads that move the
contained value (and forward the error by move), so a move-only or
expensive-to-copy T flows through the chain without a copy. value() and
operator* already had ref-qualified overloads. Verified at runtime with a
move-only payload (unique_ptr) on clang and gcc, plus an lvalue regression
pass.

Signed-off-by: Matteo Merli <mmerli@apache.org>

* st: review nits N1-N7

N1 Expected operator*/operator-> are not UB on an error: operator* is
   noexcept + std::get so it terminates; operator-> returns nullptr. Correct
   the docs to say so.
N2 Drop redundant unit prose ("in milliseconds"/"in seconds") from
   std::chrono fields/params in Policies, Consumer (AckPolicy) and the
   sendTimeout setter; the type already states the unit. (ProducerConfig's
   int64 sendTimeoutMs keeps its "milliseconds" note - it is not a chrono type.)
N3 decodeBigEndian: replace the dead `i < data.size()` guard (all codecs
   length-check first) with an assert of that precondition.
N4 ProtobufNativeSchema: guard the size_t->int narrowing in encode/decode,
   rejecting messages larger than INT_MAX instead of passing a wrapped size.
N5 OutgoingMessage: one-line note for the usesView<->payloadView invariant.
N7 Wrap the SerDeFor concept in clang-format off/on so clang-format-11 stops
   mangling the `{ expr } -> Concept;` compound requirements.
N6 Normalize config-struct field docs to the dominant /** */-before style
   (OutgoingMessage, CheckpointConsumerConfig, Stream/QueueConsumerConfig);
   enum-value ///< trailing docs are left as-is.

Verified: clang-format-11 clean; examples compile (clang); N3 runtime test
and N4 (protobuf stub) pass on clang and gcc.

Signed-off-by: Matteo Merli <mmerli@apache.org>

---------

Signed-off-by: Matteo Merli <mmerli@apache.org>
33 files changed
tree: 48d39bed15b53835e4daa719222a267010881176
  1. .github/
  2. build-support/
  3. cmake_modules/
  4. examples/
  5. include/
  6. lib/
  7. perf/
  8. pkg/
  9. proto/
  10. templates/
  11. test-conf/
  12. tests/
  13. vcpkg-example/
  14. vcpkg-triplets/
  15. win-examples/
  16. wireshark/
  17. .asf.yaml
  18. .clang-format
  19. .clang-tidy
  20. .gitignore
  21. .gitmodules
  22. CMakeLists.txt
  23. CONTRIBUTING.md
  24. Doxyfile
  25. eclipse-formatter.xml
  26. LEGACY_BUILD.md
  27. LegacyFindPackages.cmake
  28. LICENSE
  29. log4cxx.conf
  30. NOTICE
  31. pulsar-test-service-start.sh
  32. pulsar-test-service-stop.sh
  33. README.md
  34. run-unit-tests.sh
  35. SECURITY.md
  36. vcpkg.json
  37. version.txt
README.md

Pulsar C++ client library

Pulsar C++ clients support a variety of Pulsar features to enable building applications connecting to your Pulsar cluster.

For the supported Pulsar features, see Client Feature Matrix.

For how to use APIs to publish and consume messages, see examples.

Custom logger lifetime

The C++ client supports one custom logger factory per process. A logger configured through ClientConfiguration::setLogger or the C API logger functions is shared by all clients in the same process and is not scoped to an individual client instance. Set the custom logger before creating clients, and do not expect different clients to have independent log handlers.

If an application or language binding exposes logger callbacks, the callback and its context must remain valid until all Pulsar clients are closed and no background thread can emit client logs. Avoid tying the callback lifetime to a single client when multiple clients can exist in the process.

Import the library into your project

CMake with vcpkg integration

Navigate to vcpkg-example for how to import the pulsar-client-cpp into your project via vcpkg.

Download pre-built binaries

For non-vcpkg projects, you can download pre-built binaries from the official release page.

Generate the API documents

Pulsar C++ client uses doxygen to build API documents. After installing doxygen, you only need to run doxygen to generate the API documents whose main page is under the doxygen/html/index.html path.

Build with vcpkg

Since it's integrated with vcpkg, see vcpkg#README for the requirements. See LEGACY_BUILD if you want to manage dependencies by yourself or you cannot install vcpkg in your own environment.

How to build from source

The simplest way is to clone this project with the vcpkg submodule.

git clone https://github.com/apache/pulsar-client-cpp.git
cd pulsar-client-cpp
git submodule update --init --recursive
cmake -B build -DINTEGRATE_VCPKG=ON
cmake --build build -j8
  • Before 4.0.0, C++11 is required.
  • Since 4.0.0, C++17 is required.

The 1st step will download vcpkg and then install all dependencies according to the version description in vcpkg.json. The 2nd step will build the Pulsar C++ libraries under ./build/lib/, where ./build is the CMake build directory.

You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have vcpkg installed.

git clone https://github.com/apache/pulsar-client-cpp.git
cd pulsar-client-cpp
# For example, you can install vcpkg in /tmp/vcpkg
cd /tmp && git clone https://github.com/microsoft/vcpkg.git && cd -
cmake -B build -DINTEGRATE_VCPKG=ON -DCMAKE_TOOLCHAIN_FILE="/tmp/vcpkg/scripts/buildsystems/vcpkg.cmake"
cmake --build build -j8

After the build, the hierarchy of the build directory will be:

build/
  include/   -- extra C++ headers
  lib/       -- libraries
  tests/     -- test executables
  examples/  -- example executables
  generated/
    lib/     -- protobuf source files for PulsarApi.proto
    tests/   -- protobuf source files for *.proto used in tests

How to install

To install the C++ headers and libraries into a specific path, e.g. /tmp/pulsar, run the following commands:

cmake -B build -DINTEGRATE_VCPKG=ON -DCMAKE_INSTALL_PREFIX=/tmp/pulsar
cmake --build build -j8 --target install

For example, on macOS you will see:

/tmp/pulsar/
  include/pulsar     -- C/C++ headers
  lib/
    libpulsar.a      -- Static library
    libpulsar.dylib  -- Dynamic library

Tests

Tests are built by default. You should execute run-unit-tests.sh to run tests locally.

If you don't want to build the tests, disable the BUILD_TESTS option:

cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF
cmake --build build -j8

Build perf tools

If you want to build the perf tools, enable the BUILD_PERF_TOOLS option:

cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_PERF_TOOLS=ON
cmake --build build -j8

Then the perf tools will be built under ./build/perf/.

Platforms

Pulsar C++ Client Library has been tested on:

  • Linux
  • Mac OS X
  • Windows x64

Wireshark Dissector

See the wireshark directory for details.

Requirements for Contributors

It's required to install LLVM for clang-tidy and clang-format. Pulsar C++ client use clang-format 11 to format files. make format automatically formats the files.

For Ubuntu users, you can install clang-format-11 via apt install clang-format-11. For other users, run ./build-support/docker-format.sh if you have Docker installed.

We welcome contributions from the open source community, kindly make sure your changes are backward compatible with GCC 4.8 and Boost 1.53.

If your contribution adds Pulsar features for C++ clients, you need to update both the Pulsar docs and the Client Feature Matrix. See Contribution Guide for more details.