| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <cstdint> |
| #include <string> |
| #include <vector> |
| |
| #include <rapidjson/document.h> |
| |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/util/condition_variable.h" |
| #include "kudu/util/mutex.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/status_callback.h" |
| |
| namespace hive { |
| class NotificationEvent; |
| } |
| |
| namespace kudu { |
| |
| class MonoTime; |
| class Thread; |
| |
| namespace master { |
| |
| class CatalogManager; |
| |
| // A CatalogManager background task which listens for events occurring in the |
| // Hive Metastore, and synchronizes the Kudu catalog accordingly. |
| // |
| // As a background task, the lifetime of an instance of this class must be less |
| // than the catalog manager it belongs to. |
| // |
| // The notification log listener task continuously wakes up according to its |
| // configured poll period, however it performs no work when the master is a |
| // follower. |
| // |
| // When a change to the Kudu catalog is performed in response to a notification |
| // log event, the corresponding event ID is recorded in the sys catalog as the |
| // latest handled event. This ensures that masters do not double-apply |
| // notification events as leadership changes. |
| // |
| // The notification log listener listens for two types of events on Kudu tables: |
| // |
| // - ALTER TABLE RENAME |
| // Table rename is a special case of ALTER TABLE. The notification log |
| // listener listens for rename event notifications for Kudu tables, and |
| // renames the corresponding Kudu table. See below for why renames can be |
| // applied back to Kudu, but not other types of alterations. |
| // |
| // - DROP TABLE |
| // The notification log listener listens for drop table events for Kudu |
| // tables, and drops the corresponding Kudu table. This allows the catalogs |
| // to stay synchronized when DROP TABLE and DROP DATABASE CASCADE Hive |
| // commands are executed. |
| // |
| // The notification log listener can support renaming and dropping tables in a |
| // safe manner because the Kudu table ID is stored in the HMS table entry. Using |
| // the Kudu table ID, the exact table which the event applies to can always be |
| // identified. For other changes made in ALTER TABLE statements, such as ALTER |
| // TABLE DROP COLUMN, there is no way to identify with certainty which column |
| // has been dropped, since we do not store column IDs in the HMS table entries. |
| class HmsNotificationLogListenerTask { |
| public: |
| |
| explicit HmsNotificationLogListenerTask(CatalogManager* catalog_manager); |
| ~HmsNotificationLogListenerTask(); |
| |
| // Initializes the HMS notification log listener. When invoking this method, |
| // the catalog manager must be in the process of initializing. |
| Status Init() WARN_UNUSED_RESULT; |
| |
| // Shuts down the HMS notification log listener. This must be called before |
| // shutting down the catalog manager. |
| void Shutdown(); |
| |
| // Waits for the notification log listener to process the latest notification |
| // log event. |
| // |
| // Note: an error will be returned if the listener is unable to retrieve the |
| // latest notifications from the HMS. If individual notifications are unable |
| // to be processed, no error will be returned. |
| Status WaitForCatchUp(const MonoTime& deadline) WARN_UNUSED_RESULT; |
| |
| private: |
| friend class HmsNotificationLogListenerTest; |
| |
| // Runs the main loop of the listening thread. |
| void RunLoop(); |
| |
| // Polls the Hive Metastore for notification events, and handle them. |
| Status Poll(); |
| |
| // Handles an ALTER TABLE event. Must only be called on the listening thread. |
| // |
| // The event is parsed, and if it is a rename table event for a Kudu table, |
| // the table is renamed in the local catalog. All other events are ignored. |
| Status HandleAlterTableEvent(const hive::NotificationEvent& event, |
| int64_t* durable_event_id) WARN_UNUSED_RESULT; |
| |
| // Handles a DROP TABLE event. Must only be called on the listening thread. |
| // |
| // The event is parsed, and if it is a drop table event for a Kudu table, the |
| // table is deleted in the local catalog. All other events are ignored. |
| Status HandleDropTableEvent(const hive::NotificationEvent& event, |
| int64_t* durable_event_id) WARN_UNUSED_RESULT; |
| |
| // Parses the event message from a notification event. See |
| // org.apache.hadoop.hive.metastore.messaging.MessageFactory and |
| // org.apache.hadoop.hive.metastore.messaging.MessageEncoder for more info. |
| // |
| // Since JSON formats are currently the only concrete implementation, |
| // this method is specialized to return the Document type. |
| // If another MessageFactory instance becomes used in the future this |
| // method should be updated to handle it accordingly. Also because |
| // 'messageFormat' is an optional field introduced in HIVE-10562, |
| // we consider messages without this field to be `json-0.2` to be |
| // compatible with Hive distributions that do not include HIVE-10562 |
| // but still have the proper JSON message. |
| static Status ParseMessage(const hive::NotificationEvent& event, |
| rapidjson::Document* message) WARN_UNUSED_RESULT; |
| |
| // Decodes an event that was Gzip encoded by Hive. |
| // Hive also Base64 encodes the content after compressing. |
| static Status DecodeGzipMessage(const std::string& encoded, |
| std::string* decoded) WARN_UNUSED_RESULT; |
| |
| // The associated catalog manager. |
| // |
| // May be initialized to nullptr in the constructor to facilitate unit |
| // testing. In this case all interactions with the catalog manager and HMS |
| // are skipped. |
| CatalogManager* catalog_manager_; |
| |
| // The listening thread. |
| scoped_refptr<kudu::Thread> thread_; |
| |
| // Protects access to fields below. |
| mutable Mutex lock_; |
| |
| // Set to true if the task is in the process of shutting down. |
| // |
| // Protected by lock_. |
| bool closing_; |
| |
| // Manages waking the notification log listener thread when the catalog |
| // manager needs to ensure that all recent notification log events have been |
| // handled. |
| // |
| // Protected by lock_. |
| ConditionVariable wake_up_cv_; |
| |
| // Queue of callbacks to execute when the notification log listener is caught |
| // up. These callbacks enable the catalog manager to wait for the notification |
| // log listener to have processed the latest events before proceeding with |
| // metadata ops involving the HMS table namespace. |
| // |
| // Protected by lock_. |
| std::vector<StatusCallback> catch_up_callbacks_; |
| }; |
| |
| } // namespace master |
| } // namespace kudu |