[master] KUDU-3322 / KUDU-3319 HMS event logging

It would be useful to have additional logging in cases where HMS
notification events have to be analyzed from Kudu side. The Jiras
have more context on the reasoning behind this but this patch adds
* an INFO level log which shows the last processed HMS event id during
startup and
* an ERROR log if the latest event id from HMS is less than
the last seen event id from the master. Was hesitant on crashing the
master and chose to log an ERROR as in the case an out of order event
id received from HMS might lead to unnecessary crash of the Kudu
master.

Change-Id: I242c9cedf170ed621867b905f1cb1875347d5887
Reviewed-on: http://gerrit.cloudera.org:8080/18866
Tested-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Alexey Serbin <alexey@apache.org>
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index f7eca33..39fad29 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -311,6 +311,12 @@
   });
 }
 
+Status HmsCatalog::GetCurrentNotificationEventId(int64_t* event_id) {
+  return ha_client_.Execute([&] (HmsClient* client) {
+    return client->GetCurrentNotificationEventId(event_id);
+  });
+}
+
 Status HmsCatalog::GetUuid(string* uuid) {
   std::lock_guard<simple_spinlock> l(uuid_lock_);
   if (!uuid_) {
diff --git a/src/kudu/hms/hms_catalog.h b/src/kudu/hms/hms_catalog.h
index ae0f1e8..03e6943 100644
--- a/src/kudu/hms/hms_catalog.h
+++ b/src/kudu/hms/hms_catalog.h
@@ -143,6 +143,9 @@
   Status GetNotificationEvents(int64_t last_event_id, int max_events,
                                std::vector<hive::NotificationEvent>* events) WARN_UNUSED_RESULT;
 
+  // Retrieves the latest notification event id from the HMS.
+  Status GetCurrentNotificationEventId(int64_t* event_id) WARN_UNUSED_RESULT;
+
   // Get the UUID associated with the remote HMS instance. This is an identifier
   // stored in the HMS's backing database which does not change even if the
   // HMS itself changes hostnames, etc.
diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc
index 0601090..82a07f6 100644
--- a/src/kudu/hms/mini_hms.cc
+++ b/src/kudu/hms/mini_hms.cc
@@ -184,6 +184,10 @@
   return wait;
 }
 
+Status MiniHms::DeleteDatabaseDir() {
+  return Env::Default()->DeleteRecursively(JoinPathSegments(data_root_, metadb_subdir_));
+}
+
 Status MiniHms::Stop() {
   if (hms_process_) {
     VLOG(1) << "Stopping HMS";
diff --git a/src/kudu/hms/mini_hms.h b/src/kudu/hms/mini_hms.h
index 4c2be11..06d20b6 100644
--- a/src/kudu/hms/mini_hms.h
+++ b/src/kudu/hms/mini_hms.h
@@ -72,6 +72,9 @@
   // Unpause the Hive metastore process.
   Status Resume() WARN_UNUSED_RESULT;
 
+  // Delete the HMS database directory.
+  Status DeleteDatabaseDir() WARN_UNUSED_RESULT;
+
   // Returns the address of the Hive metastore. Should only be called after the
   // metastore is started.
   HostPort address() const {
diff --git a/src/kudu/integration-tests/master_hms-itest.cc b/src/kudu/integration-tests/master_hms-itest.cc
index 0dd0459..62cc0d6 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <algorithm>
+#include <fstream>
 #include <functional>
 #include <initializer_list>
 #include <map>
@@ -84,8 +85,8 @@
     opts.principal = Principal();
     // Tune down the notification log poll period in order to speed up catalog convergence.
     opts.extra_master_flags.emplace_back("--hive_metastore_notification_log_poll_period_seconds=1");
+    SetFlags(&opts);
     StartClusterWithOpts(std::move(opts));
-
     thrift::ClientOptions hms_opts;
     hms_opts.enable_kerberos = EnableKerberos();
     hms_opts.service_principal = "hive";
@@ -181,6 +182,8 @@
   virtual string Principal() {
     return "kudu";
   }
+
+  virtual void SetFlags(ExternalMiniClusterOptions* opts) const {}
 };
 
 TEST_F(MasterHmsTest, TestCreateTable) {
@@ -878,4 +881,46 @@
   ASSERT_OK(harness_.hms_client()->GetTable("default", "my_table", &table));
   ASSERT_EQ("test-user", table.owner);
 }
+
+class MasterHmsDBTest : public MasterHmsTest {
+ public:
+  void SetFlags(ExternalMiniClusterOptions* opts) const override {
+    opts->extra_master_flags.emplace_back("--logtostderr=false");
+  }
+};
+
+// TODO(achennaka): Skip test until server timeouts seen in ASAN builds are resolved.
+TEST_F(MasterHmsDBTest, DISABLED_TestHMSDBErase) {
+  // Generate 2 HMS events. With just one event created the published
+  // event id is not 0 after the database drop.
+  ASSERT_OK(CreateKuduTable("default", "a"));
+  unique_ptr<KuduTableAlterer> table_alterer;
+  table_alterer.reset(client_->NewTableAlterer("default.a")->RenameTo("default.b"));
+  ASSERT_OK(table_alterer->Alter());
+
+  // Ensure the table is present in HMS database.
+  NO_FATALS(CheckTable("default", "b", /*user=*/ nullopt));
+
+  // Simulate accidental HMS database loss.
+  ASSERT_OK(StopHms());
+  ASSERT_OK(cluster_->hms()->DeleteDatabaseDir());
+  ASSERT_OK(StartHms());
+
+  // Now everytime we poll HMS for new events, we should be logging an error message.
+  constexpr const char* const str = "The event ID 2 last seen by Kudu master is greater "
+                                    "than 0 currently reported by HMS. Has the HMS database "
+                                    "been reset (backup&restore, etc.)?";
+  ASSERT_EVENTUALLY([&] {
+    string line;
+    bool log_found = false;
+    std::ifstream in(strings::Substitute("$0/kudu-master.ERROR", cluster_->master()->log_dir()));
+    while (std::getline(in, line)) {
+      if (line.find(str) != std::string::npos) {
+        log_found = true;
+        break;
+      }
+    }
+    ASSERT_TRUE(log_found);
+  });
+}
 } // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index d16447a..22c56a3 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1475,8 +1475,8 @@
           "Loading latest processed Hive Metastore notification log event ID";
       LOG(INFO) << kNotificationLogEventIdDescription << "...";
       LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + kNotificationLogEventIdDescription) {
-      if (!check([this]() { return this->InitLatestNotificationLogEventId(); },
-                 *consensus, term, kNotificationLogEventIdDescription).ok()) {
+        if (!check([this]() { return this->InitLatestNotificationLogEventId(); },
+                   *consensus, term, kNotificationLogEventIdDescription).ok()) {
           return;
         }
       }
@@ -5508,6 +5508,8 @@
   int64_t hms_notification_log_event_id;
   RETURN_NOT_OK(sys_catalog_->GetLatestNotificationLogEventId(&hms_notification_log_event_id));
   hms_notification_log_event_id_ = hms_notification_log_event_id;
+  LOG(INFO) << "Last processed Hive Metastore notification event ID: "
+            << hms_notification_log_event_id_;
   return Status::OK();
 }
 
diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc
index 5b225de..60ac71f 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -237,6 +237,22 @@
         processed_event_id, batch_size, &events),
                           "failed to retrieve notification log events");
 
+    // If we do not receive any new events it could be because the HMS event ID in the Kudu
+    // master is higher than what is in the HMS database which causes Drop/Alter table
+    // commands to fail on Kudu side.
+    if (events.empty()) {
+      int64_t event_id;
+          RETURN_NOT_OK_PREPEND(catalog_manager_->hms_catalog()->
+          GetCurrentNotificationEventId(&event_id),
+                                "failed to retrieve latest notification log event");
+      if (event_id < processed_event_id) {
+        LOG(ERROR) << Substitute("The event ID $0 last seen by Kudu master is greater "
+                                 "than $1 currently reported by HMS. Has the HMS database "
+                                 "been reset (backup&restore, etc.)?",
+                                 processed_event_id, event_id);
+      }
+    }
+
     for (const auto& event : events) {
       VLOG(1) << "Processing notification log event: " << EventDebugString(event);