blob: d7b24cd82e26059148c124107bb1904195ba8357 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/#include "rocketmq/DefaultMQPushConsumer.h"
#include <atomic>
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
ROCKETMQ_NAMESPACE_BEGIN
class CounterMessageListener : public StandardMessageListener {
public:
explicit CounterMessageListener(std::atomic_long& counter) : counter_(counter) {
}
ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) override {
counter_.fetch_add(msgs.size());
return ConsumeMessageResult::SUCCESS;
}
private:
std::atomic_long& counter_;
};
ROCKETMQ_NAMESPACE_END
int main(int argc, char* argv[]) {
using namespace ROCKETMQ_NAMESPACE;
Logger& logger = getLogger();
logger.setLevel(Level::Debug);
logger.init();
std::atomic_long counter(0);
DefaultMQPushConsumer push_consumer("TestGroup");
MessageListener* listener = new CounterMessageListener(counter);
push_consumer.setGroupName("TestGroup");
push_consumer.setInstanceName("CID_sample_member_0");
push_consumer.subscribe("TestTopic", "*");
push_consumer.setNamesrvAddr("11.167.164.105:9876");
push_consumer.registerMessageListener(listener);
push_consumer.setThrottle("TestTopic", 20);
push_consumer.start();
std::atomic_bool stopped(false);
std::thread report_thread([&counter, &stopped]() {
while (!stopped) {
std::this_thread::sleep_for(std::chrono::seconds(1));
long qps;
while (true) {
qps = counter.load(std::memory_order_relaxed);
if (counter.compare_exchange_weak(qps, 0, std::memory_order_relaxed)) {
break;
}
}
std::cout << "QPS: " << qps << std::endl;
}
});
std::this_thread::sleep_for(std::chrono::seconds(10));
stopped.store(true);
if (report_thread.joinable()) {
report_thread.join();
}
push_consumer.shutdown();
return EXIT_SUCCESS;
}