blob: fc65f9213a0eeeff17d2978884cc3dabb83407f5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/master/hms_notification_log_listener.h"
#include <cstdint>
#include <thread>
#include <string>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_uint32(hive_metastore_notification_log_poll_period_seconds);
DECLARE_uint32(hive_metastore_notification_log_poll_inject_latency_ms);
using std::string;
namespace kudu {
namespace master {
class HmsNotificationLogListenerTest : public KuduTest {
public:
uint32_t poll_period_ = FLAGS_hive_metastore_notification_log_poll_period_seconds;
static Status DecodeGzipMessage(const string& encoded, string* decoded) {
return HmsNotificationLogListenerTask::DecodeGzipMessage(encoded, decoded);
}
};
// Test that an immediate shutdown will short-circuit the poll period.
TEST_F(HmsNotificationLogListenerTest, TestImmediateShutdown) {
HmsNotificationLogListenerTask listener(nullptr);
ASSERT_OK(listener.Init());
// Wait a bit to ensure the task thread enters the poll wait.
SleepFor(MonoDelta::FromMilliseconds(100));
MonoTime start = MonoTime::Now();
listener.Shutdown();
ASSERT_LT(MonoTime::Now() - start, MonoDelta::FromSeconds(poll_period_ / 2));
}
// Test that WaitForCatchUp will short-circuit the poll period.
TEST_F(HmsNotificationLogListenerTest, TestPoll) {
HmsNotificationLogListenerTask listener(nullptr);
ASSERT_OK(listener.Init());
// Wait a bit to ensure the task thread enters the poll wait.
SleepFor(MonoDelta::FromMilliseconds(100));
ASSERT_OK(listener.WaitForCatchUp(MonoTime::Now() + MonoDelta::FromSeconds(poll_period_ / 2)));
}
// Test that WaitForCatchUp will short-circuit the poll period, even when the
// task is in the middle of polling when the wait initiates.
TEST_F(HmsNotificationLogListenerTest, TestWaitWhilePolling) {
FLAGS_hive_metastore_notification_log_poll_inject_latency_ms = 100;
HmsNotificationLogListenerTask listener(nullptr);
ASSERT_OK(listener.Init());
ASSERT_OK(listener.WaitForCatchUp(MonoTime::Now() + MonoDelta::FromSeconds(poll_period_ / 2)));
}
// Test that shutting down with a waiter will result in the waiter receiving an error.
TEST_F(HmsNotificationLogListenerTest, TestWaitAndShutdown) {
// Inject some latency to ensure that the wait occurs when the task is
// polling, otherwise it could immediately begin servicing the wait and not
// actually see the shutdown.
FLAGS_hive_metastore_notification_log_poll_inject_latency_ms = 100;
HmsNotificationLogListenerTask listener(nullptr);
ASSERT_OK(listener.Init());
auto waiter = std::thread([&] {
Status s = listener.WaitForCatchUp(MonoTime::Now() +
MonoDelta::FromSeconds(poll_period_ / 2));
CHECK(s.IsServiceUnavailable());
});
// There's a race between the waiter thread checking the closed_ flag in
// WaitForCatchUp and this thread setting the flag in Shutdown. This test is
// trying to excercise the case where the waiter is able to enqueue the
// callback, so to make that more likely we slow down the call to Shutdown.
SleepFor(MonoDelta::FromMilliseconds(10));
listener.Shutdown();
waiter.join();
}
TEST_F(HmsNotificationLogListenerTest, TestGzipEventDecoding) {
// A message encoded via the following Hive code:
// JSONDropTableMessage message =
// new JSONDropTableMessage("server", "principal", "db", "table", 0L);
// String serialized =
// GzipJSONMessageEncoder.getInstance().getSerializer().serialize(message);
string encoded = "H4sIAAAAAAAAAKtWKk4tKkstUrKCMXTAjMzk1ICizLzkzILEHKBcAZyto5SSBBQAE"
"jpKJYlJOalADoSG8kMqC4BieaU5OVAB/6Qsr+L8PLhYZm5qcUliboGSlUEtAOPIspV/AAAA";
// The JSON content of the encoded string and the expected result after decoding.
string json = "{\"server\":\"server\",\"servicePrincipal\":\"principal\",\"db\":\"db\","
"\"table\":\"table\",\"tableType\":null,\"tableObjJson\":null,\"timestamp\":0}";
string decoded;
ASSERT_OK(DecodeGzipMessage(encoded, &decoded));
ASSERT_EQ(json, decoded);
}
} // namespace master
} // namespace kudu