blob: d7bdd36733d5b932d5d613cd555ea845bad80fe4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <string>
#include <vector>
#include <rapidjson/document.h>
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
namespace hive {
class NotificationEvent;
}
namespace kudu {
class MonoTime;
class Thread;
namespace master {
class CatalogManager;
// A CatalogManager background task which listens for events occurring in the
// Hive Metastore, and synchronizes the Kudu catalog accordingly.
//
// As a background task, the lifetime of an instance of this class must be less
// than the catalog manager it belongs to.
//
// The notification log listener task continuously wakes up according to its
// configured poll period, however it performs no work when the master is a
// follower.
//
// When a change to the Kudu catalog is performed in response to a notification
// log event, the corresponding event ID is recorded in the sys catalog as the
// latest handled event. This ensures that masters do not double-apply
// notification events as leadership changes.
//
// The notification log listener listens for two types of events on Kudu tables:
//
// - ALTER TABLE RENAME
// Table rename is a special case of ALTER TABLE. The notification log
// listener listens for rename event notifications for Kudu tables, and
// renames the corresponding Kudu table. See below for why renames can be
// applied back to Kudu, but not other types of alterations.
//
// - DROP TABLE
// The notification log listener listens for drop table events for Kudu
// tables, and drops the corresponding Kudu table. This allows the catalogs
// to stay synchronized when DROP TABLE and DROP DATABASE CASCADE Hive
// commands are executed.
//
// The notification log listener can support renaming and dropping tables in a
// safe manner because the Kudu table ID is stored in the HMS table entry. Using
// the Kudu table ID, the exact table which the event applies to can always be
// identified. For other changes made in ALTER TABLE statements, such as ALTER
// TABLE DROP COLUMN, there is no way to identify with certainty which column
// has been dropped, since we do not store column IDs in the HMS table entries.
class HmsNotificationLogListenerTask {
public:
explicit HmsNotificationLogListenerTask(CatalogManager* catalog_manager);
~HmsNotificationLogListenerTask();
// Initializes the HMS notification log listener. When invoking this method,
// the catalog manager must be in the process of initializing.
Status Init() WARN_UNUSED_RESULT;
// Shuts down the HMS notification log listener. This must be called before
// shutting down the catalog manager.
void Shutdown();
// Waits for the notification log listener to process the latest notification
// log event.
//
// Note: an error will be returned if the listener is unable to retrieve the
// latest notifications from the HMS. If individual notifications are unable
// to be processed, no error will be returned.
Status WaitForCatchUp(const MonoTime& deadline) WARN_UNUSED_RESULT;
private:
friend class HmsNotificationLogListenerTest;
// Runs the main loop of the listening thread.
void RunLoop();
// Polls the Hive Metastore for notification events, and handle them.
Status Poll();
// Handles an ALTER TABLE event. Must only be called on the listening thread.
//
// The event is parsed, and if it is a rename table event for a Kudu table,
// the table is renamed in the local catalog. All other events are ignored.
Status HandleAlterTableEvent(const hive::NotificationEvent& event,
int64_t* durable_event_id) WARN_UNUSED_RESULT;
// Handles a DROP TABLE event. Must only be called on the listening thread.
//
// The event is parsed, and if it is a drop table event for a Kudu table, the
// table is deleted in the local catalog. All other events are ignored.
Status HandleDropTableEvent(const hive::NotificationEvent& event,
int64_t* durable_event_id) WARN_UNUSED_RESULT;
// Parses the event message from a notification event. See
// org.apache.hadoop.hive.metastore.messaging.MessageFactory and
// org.apache.hadoop.hive.metastore.messaging.MessageEncoder for more info.
//
// Since JSON formats are currently the only concrete implementation,
// this method is specialized to return the Document type.
// If another MessageFactory instance becomes used in the future this
// method should be updated to handle it accordingly. Also because
// 'messageFormat' is an optional field introduced in HIVE-10562,
// we consider messages without this field to be `json-0.2` to be
// compatible with Hive distributions that do not include HIVE-10562
// but still have the proper JSON message.
static Status ParseMessage(const hive::NotificationEvent& event,
rapidjson::Document* message) WARN_UNUSED_RESULT;
// Decodes an event that was Gzip encoded by Hive.
// Hive also Base64 encodes the content after compressing.
static Status DecodeGzipMessage(const std::string& encoded,
std::string* decoded) WARN_UNUSED_RESULT;
// The associated catalog manager.
//
// May be initialized to nullptr in the constructor to facilitate unit
// testing. In this case all interactions with the catalog manager and HMS
// are skipped.
CatalogManager* catalog_manager_;
// The listening thread.
scoped_refptr<kudu::Thread> thread_;
// Protects access to fields below.
mutable Mutex lock_;
// Set to true if the task is in the process of shutting down.
//
// Protected by lock_.
bool closing_;
// Manages waking the notification log listener thread when the catalog
// manager needs to ensure that all recent notification log events have been
// handled.
//
// Protected by lock_.
ConditionVariable wake_up_cv_;
// Queue of callbacks to execute when the notification log listener is caught
// up. These callbacks enable the catalog manager to wait for the notification
// log listener to have processed the latest events before proceeding with
// metadata ops involving the HMS table namespace.
//
// Protected by lock_.
std::vector<StatusCallback> catch_up_callbacks_;
};
} // namespace master
} // namespace kudu