[fix][cpp-client] Pass seek error to callback when SEEK command fails (#549)
diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml
index 21c9a16..fb5f539 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -99,10 +99,25 @@
key: vcpkg-${{ runner.os }}-${{ hashFiles('vcpkg.json') }}
restore-keys: vcpkg-${{ runner.os }}-
+ - name: Restore vcpkg downloads cache
+ uses: actions/cache@v4
+ with:
+ path: vcpkg/downloads
+ key: vcpkg-downloads-${{ runner.os }}-${{ hashFiles('vcpkg.json') }}
+ restore-keys: vcpkg-downloads-${{ runner.os }}-
+
- name: Build the project
run: |
- cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON
- cmake --build build -j8
+ for attempt in 1 2 3; do
+ echo "Build attempt $attempt/3"
+ if cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON && cmake --build build -j8; then
+ exit 0
+ fi
+ echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..."
+ sleep 90
+ done
+ echo "All build attempts failed"
+ exit 1
- name: Tidy check
run: |
@@ -137,10 +152,25 @@
key: vcpkg-${{ runner.os }}-${{ hashFiles('vcpkg.json') }}
restore-keys: vcpkg-${{ runner.os }}-
+ - name: Restore vcpkg downloads cache
+ uses: actions/cache@v4
+ with:
+ path: vcpkg/downloads
+ key: vcpkg-downloads-${{ runner.os }}-${{ hashFiles('vcpkg.json') }}
+ restore-keys: vcpkg-downloads-${{ runner.os }}-
+
- name: Build core libraries
run: |
- cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF
- cmake --build build -j8
+ for attempt in 1 2 3; do
+ echo "Build attempt $attempt/3"
+ if cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF && cmake --build build -j8; then
+ exit 0
+ fi
+ echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..."
+ sleep 90
+ done
+ echo "All build attempts failed"
+ exit 1
- name: Check formatting
run: |
@@ -164,8 +194,16 @@
- name: Build with Boost.Asio
run: |
- cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON
- cmake --build build-boost-asio -j8
+ for attempt in 1 2 3; do
+ echo "Build Boost.Asio attempt $attempt/3"
+ if cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON && cmake --build build-boost-asio -j8; then
+ exit 0
+ fi
+ echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..."
+ sleep 90
+ done
+ echo "All build attempts failed"
+ exit 1
- name: Build perf tools
run: |
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 4a62452..d268484 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1835,7 +1835,9 @@
if (result == ResultOk) {
LockGuard lock(mutex_);
if (getCnx().expired() || reconnectionPending_) {
- // It's during reconnection, complete the seek future after connection is established
+ // Reconnection path: delay the seek callback until connectionOpened. clearReceiveQueue()
+ // and handleCreateConsumer() (which clears incomingMessages_ under the lock) run before
+ // the seek callback is invoked, so hasMessageAvailable() after seek sees cleared state.
seekStatus_ = SeekStatus::COMPLETED;
LOG_INFO(getName() << "Delay the seek future until the reconnection is done");
} else {
@@ -1860,9 +1862,8 @@
LockGuard lock{mutex_};
seekStatus_ = SeekStatus::NOT_STARTED;
lastSeekArg_ = previousLastSeekArg;
- executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() {
- callback(ResultOk);
- });
+ executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()},
+ result]() { callback(result); });
}
});
}
diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc
index f66c27d..d208fc6 100644
--- a/tests/ConsumerSeekTest.cc
+++ b/tests/ConsumerSeekTest.cc
@@ -258,6 +258,31 @@
client.close();
}
+TEST_F(ConsumerSeekTest, testSeekFailureIsPropagated) {
+ using namespace std::chrono_literals;
+
+ Client client(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(1));
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe("testSeekFailureIsPropagated", "sub", consumer));
+
+ auto connection = *PulsarFriend::getConnections(client).begin();
+ auto mockServer = std::make_shared<MockServer>(connection);
+ connection->attachMockServer(mockServer);
+ mockServer->setRequestDelay({{"SEEK", 5000}});
+
+ std::promise<Result> promise;
+ auto future = promise.get_future();
+ consumer.seekAsync(MessageId::earliest(), [&promise](Result result) { promise.set_value(result); });
+
+ // Cancel the mocked SEEK success so request completes with timeout.
+ ASSERT_GE(mockServer->close(), 1);
+
+ ASSERT_EQ(future.wait_for(5s), std::future_status::ready);
+ ASSERT_EQ(future.get(), ResultTimeout);
+
+ client.close();
+}
+
INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
} // namespace pulsar