blob: 2dfafa743b936304ed349ed51fa61f0d4b6e7bf9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <cstdint>
#include <exception>
#include <functional>
#include <iostream>
#include <thread>
#include "SchedulerImpl.h"
#include "gtest/gtest.h"
ROCKETMQ_NAMESPACE_BEGIN
class SchedulerTest : public testing::Test {
public:
void SetUp() override {
}
protected:
};
TEST_F(SchedulerTest, testSingleShot) {
SchedulerImpl scheduler;
scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
auto callback = [&]() {
absl::MutexLock lock(&mtx);
cv.Signal();
callback_fire_count++;
};
scheduler.schedule(callback, "single-shot", std::chrono::milliseconds(10), std::chrono::milliseconds(0));
// Wait till callback is executed.
{
absl::MutexLock lock(&mtx);
if (!callback_fire_count) {
cv.Wait(&mtx);
}
}
scheduler.shutdown();
}
TEST_F(SchedulerTest, testCancel) {
SchedulerImpl scheduler;
scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
auto callback = [&]() {
absl::MutexLock lock(&mtx);
cv.Signal();
callback_fire_count++;
};
std::uint32_t task_id = scheduler.schedule(callback, "test-cancel", std::chrono::seconds(1), std::chrono::seconds(1));
scheduler.cancel(task_id);
std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(0, callback_fire_count);
scheduler.shutdown();
}
TEST_F(SchedulerTest, testPeriodicShot) {
SchedulerImpl scheduler;
scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
auto callback = [&]() {
absl::MutexLock lock(&mtx);
cv.Signal();
callback_fire_count++;
};
std::uintptr_t task_id =
scheduler.schedule(callback, "periodic-task", std::chrono::milliseconds(10), std::chrono::seconds(1));
// Wait till callback is executed.
std::this_thread::sleep_for(std::chrono::seconds(5));
ASSERT_TRUE(callback_fire_count >= 4);
scheduler.cancel(task_id);
scheduler.shutdown();
}
TEST_F(SchedulerTest, testSingleShotWithZeroDelay) {
SchedulerImpl scheduler;
scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
auto callback = [&]() {
absl::MutexLock lock(&mtx);
cv.Signal();
callback_fire_count++;
};
scheduler.schedule(callback, "single-shot-with-0-delay", std::chrono::milliseconds(0), std::chrono::milliseconds(0));
// Wait till callback is executed.
{
absl::MutexLock lock(&mtx);
if (!callback_fire_count) {
cv.Wait(&mtx);
}
}
scheduler.shutdown();
}
TEST_F(SchedulerTest, testException) {
SchedulerImpl scheduler;
scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
auto callback = [&]() {
{
absl::MutexLock lock(&mtx);
cv.Signal();
callback_fire_count++;
}
std::exception e;
throw e;
};
scheduler.schedule(callback, "test-exception", std::chrono::milliseconds(100), std::chrono::milliseconds(100));
// Wait till callback is executed.
{
absl::MutexLock lock(&mtx);
if (callback_fire_count <= 5) {
cv.Wait(&mtx);
}
}
std::this_thread::sleep_for(std::chrono::seconds(3));
scheduler.shutdown();
}
ROCKETMQ_NAMESPACE_END