Fix flaky testConsumerEventWithoutPartition cause by the change of Pulsar 3.0 (#281)

### Motivation

https://github.com/apache/pulsar/pull/19502 brings a change to the
consumer selection of the Failover subscription.

Before that PR, if a new consumer joins a Failover subscription when
there is a consumer that subscribes the topic, the new consumer will
receive the "inactive" event.

After that PR, a random consumer will receive the "inactive" event.

### Modifications

Change the assertion when `consumer` subscribes the topic with the same
subscription.

In addition, use a thread safe queue to avoid race conditions.
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 7867a57..b6956fe 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -32,6 +32,8 @@
 #include "HttpHelper.h"
 #include "NoOpsCryptoKeyReader.h"
 #include "PulsarFriend.h"
+#include "SynchronizedQueue.h"
+#include "WaitUtils.h"
 #include "lib/ClientConnection.h"
 #include "lib/Future.h"
 #include "lib/LogUtils.h"
@@ -64,34 +66,13 @@
         inActiveQueue_.push(partitionId);
     }
 
-    std::queue<int> activeQueue_;
-    std::queue<int> inActiveQueue_;
+    SynchronizedQueue<int> activeQueue_;
+    SynchronizedQueue<int> inActiveQueue_;
     std::string name_;
 };
 
 typedef std::shared_ptr<ConsumerStateEventListener> ConsumerStateEventListenerPtr;
 
-void verifyConsumerNotReceiveAnyStateChanges(ConsumerStateEventListenerPtr listener) {
-    ASSERT_EQ(0, listener->activeQueue_.size());
-    ASSERT_EQ(0, listener->inActiveQueue_.size());
-}
-
-void verifyConsumerActive(ConsumerStateEventListenerPtr listener, int partitionId) {
-    ASSERT_NE(0, listener->activeQueue_.size());
-    int pid = listener->activeQueue_.front();
-    listener->activeQueue_.pop();
-    ASSERT_EQ(partitionId, pid);
-    ASSERT_EQ(0, listener->inActiveQueue_.size());
-}
-
-void verifyConsumerInactive(ConsumerStateEventListenerPtr listener, int partitionId) {
-    ASSERT_NE(0, listener->inActiveQueue_.size());
-    int pid = listener->inActiveQueue_.front();
-    listener->inActiveQueue_.pop();
-    ASSERT_EQ(partitionId, pid);
-    ASSERT_EQ(0, listener->activeQueue_.size());
-}
-
 class ActiveInactiveListenerEvent : public ConsumerEventListener {
    public:
     void becameActive(Consumer consumer, int partitionId) override {
@@ -119,9 +100,7 @@
 
     const std::string topicName = "testConsumerEventWithoutPartition-topic-" + std::to_string(time(nullptr));
     const std::string subName = "sub";
-    const int waitTimeInMs = 1000;
-    // constexpr int unAckedMessagesTimeoutMs = 10000;
-    // constexpr int tickDurationInMs = 1000;
+    const auto waitTime = std::chrono::seconds(3);
 
     // 1. two consumers on the same subscription
     Consumer consumer1;
@@ -132,7 +111,9 @@
     config1.setConsumerType(ConsumerType::ConsumerFailover);
 
     ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config1, consumer1));
-    std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
+    waitUntil(waitTime, [&listener1]() -> bool { return listener1->activeQueue_.size() == 1; });
+    ASSERT_EQ(listener1->activeQueue_.size(), 1);
+    ASSERT_EQ(listener1->activeQueue_.pop(), -1);
 
     Consumer consumer2;
     ConsumerConfiguration config2;
@@ -142,18 +123,22 @@
     config2.setConsumerType(ConsumerType::ConsumerFailover);
 
     ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config2, consumer2));
-    std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
-
-    verifyConsumerActive(listener1, -1);
-    verifyConsumerInactive(listener2, -1);
-
-    // clear inActiveQueue_
-    std::queue<int>().swap(listener2->inActiveQueue_);
+    // Since https://github.com/apache/pulsar/pull/19502, both consumer and consumer2 could receive the
+    // inactive event
+    waitUntil(waitTime, [&listener1, &listener2]() -> bool {
+        return listener1->inActiveQueue_.size() == 1 || listener2->inActiveQueue_.size() == 1;
+    });
+    if (listener1->inActiveQueue_.size() == 1) {
+        ASSERT_EQ(listener1->inActiveQueue_.pop(), -1);
+    } else {
+        ASSERT_EQ(listener2->inActiveQueue_.size(), 1);
+        ASSERT_EQ(listener2->inActiveQueue_.pop(), -1);
+    }
 
     consumer1.close();
-    std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
-    verifyConsumerActive(listener2, -1);
-    verifyConsumerNotReceiveAnyStateChanges(listener1);
+    waitUntil(waitTime, [&listener2]() -> bool { return listener2->activeQueue_.size() == 1; });
+    ASSERT_EQ(listener2->activeQueue_.size(), 1);
+    ASSERT_EQ(listener2->activeQueue_.pop(), -1);
 }
 
 TEST(ConsumerTest, testConsumerEventWithPartition) {
diff --git a/tests/SynchronizedQueue.h b/tests/SynchronizedQueue.h
new file mode 100644
index 0000000..d8c18a8
--- /dev/null
+++ b/tests/SynchronizedQueue.h
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <mutex>
+#include <queue>
+
+template <typename T>
+class SynchronizedQueue {
+   public:
+    void push(const T& value) {
+        std::lock_guard<std::mutex> lock{mutex_};
+        queue_.push(value);
+    }
+
+    T pop() {
+        std::lock_guard<std::mutex> lock{mutex_};
+        auto value = queue_.front();
+        queue_.pop();
+        return value;
+    }
+
+    size_t size() const {
+        std::lock_guard<std::mutex> lock{mutex_};
+        return queue_.size();
+    }
+
+   private:
+    mutable std::mutex mutex_;
+    std::queue<T> queue_;
+};