Fix flaky testConsumerEventWithoutPartition cause by the change of Pulsar 3.0 (#281)
### Motivation
https://github.com/apache/pulsar/pull/19502 brings a change to the
consumer selection of the Failover subscription.
Before that PR, if a new consumer joins a Failover subscription when
there is a consumer that subscribes the topic, the new consumer will
receive the "inactive" event.
After that PR, a random consumer will receive the "inactive" event.
### Modifications
Change the assertion when `consumer` subscribes the topic with the same
subscription.
In addition, use a thread safe queue to avoid race conditions.
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 7867a57..b6956fe 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -32,6 +32,8 @@
#include "HttpHelper.h"
#include "NoOpsCryptoKeyReader.h"
#include "PulsarFriend.h"
+#include "SynchronizedQueue.h"
+#include "WaitUtils.h"
#include "lib/ClientConnection.h"
#include "lib/Future.h"
#include "lib/LogUtils.h"
@@ -64,34 +66,13 @@
inActiveQueue_.push(partitionId);
}
- std::queue<int> activeQueue_;
- std::queue<int> inActiveQueue_;
+ SynchronizedQueue<int> activeQueue_;
+ SynchronizedQueue<int> inActiveQueue_;
std::string name_;
};
typedef std::shared_ptr<ConsumerStateEventListener> ConsumerStateEventListenerPtr;
-void verifyConsumerNotReceiveAnyStateChanges(ConsumerStateEventListenerPtr listener) {
- ASSERT_EQ(0, listener->activeQueue_.size());
- ASSERT_EQ(0, listener->inActiveQueue_.size());
-}
-
-void verifyConsumerActive(ConsumerStateEventListenerPtr listener, int partitionId) {
- ASSERT_NE(0, listener->activeQueue_.size());
- int pid = listener->activeQueue_.front();
- listener->activeQueue_.pop();
- ASSERT_EQ(partitionId, pid);
- ASSERT_EQ(0, listener->inActiveQueue_.size());
-}
-
-void verifyConsumerInactive(ConsumerStateEventListenerPtr listener, int partitionId) {
- ASSERT_NE(0, listener->inActiveQueue_.size());
- int pid = listener->inActiveQueue_.front();
- listener->inActiveQueue_.pop();
- ASSERT_EQ(partitionId, pid);
- ASSERT_EQ(0, listener->activeQueue_.size());
-}
-
class ActiveInactiveListenerEvent : public ConsumerEventListener {
public:
void becameActive(Consumer consumer, int partitionId) override {
@@ -119,9 +100,7 @@
const std::string topicName = "testConsumerEventWithoutPartition-topic-" + std::to_string(time(nullptr));
const std::string subName = "sub";
- const int waitTimeInMs = 1000;
- // constexpr int unAckedMessagesTimeoutMs = 10000;
- // constexpr int tickDurationInMs = 1000;
+ const auto waitTime = std::chrono::seconds(3);
// 1. two consumers on the same subscription
Consumer consumer1;
@@ -132,7 +111,9 @@
config1.setConsumerType(ConsumerType::ConsumerFailover);
ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config1, consumer1));
- std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
+ waitUntil(waitTime, [&listener1]() -> bool { return listener1->activeQueue_.size() == 1; });
+ ASSERT_EQ(listener1->activeQueue_.size(), 1);
+ ASSERT_EQ(listener1->activeQueue_.pop(), -1);
Consumer consumer2;
ConsumerConfiguration config2;
@@ -142,18 +123,22 @@
config2.setConsumerType(ConsumerType::ConsumerFailover);
ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config2, consumer2));
- std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
-
- verifyConsumerActive(listener1, -1);
- verifyConsumerInactive(listener2, -1);
-
- // clear inActiveQueue_
- std::queue<int>().swap(listener2->inActiveQueue_);
+ // Since https://github.com/apache/pulsar/pull/19502, both consumer and consumer2 could receive the
+ // inactive event
+ waitUntil(waitTime, [&listener1, &listener2]() -> bool {
+ return listener1->inActiveQueue_.size() == 1 || listener2->inActiveQueue_.size() == 1;
+ });
+ if (listener1->inActiveQueue_.size() == 1) {
+ ASSERT_EQ(listener1->inActiveQueue_.pop(), -1);
+ } else {
+ ASSERT_EQ(listener2->inActiveQueue_.size(), 1);
+ ASSERT_EQ(listener2->inActiveQueue_.pop(), -1);
+ }
consumer1.close();
- std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
- verifyConsumerActive(listener2, -1);
- verifyConsumerNotReceiveAnyStateChanges(listener1);
+ waitUntil(waitTime, [&listener2]() -> bool { return listener2->activeQueue_.size() == 1; });
+ ASSERT_EQ(listener2->activeQueue_.size(), 1);
+ ASSERT_EQ(listener2->activeQueue_.pop(), -1);
}
TEST(ConsumerTest, testConsumerEventWithPartition) {
diff --git a/tests/SynchronizedQueue.h b/tests/SynchronizedQueue.h
new file mode 100644
index 0000000..d8c18a8
--- /dev/null
+++ b/tests/SynchronizedQueue.h
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <mutex>
+#include <queue>
+
+template <typename T>
+class SynchronizedQueue {
+ public:
+ void push(const T& value) {
+ std::lock_guard<std::mutex> lock{mutex_};
+ queue_.push(value);
+ }
+
+ T pop() {
+ std::lock_guard<std::mutex> lock{mutex_};
+ auto value = queue_.front();
+ queue_.pop();
+ return value;
+ }
+
+ size_t size() const {
+ std::lock_guard<std::mutex> lock{mutex_};
+ return queue_.size();
+ }
+
+ private:
+ mutable std::mutex mutex_;
+ std::queue<T> queue_;
+};