KUDU-3401 Fix table creation with HMS Integration

Hive queries on Kudu Tables were failing with the following stack trace:

ERROR : Failed
org.apache.hadoop.hive.metastore.api.MetaException: java.lang.ClassNotFoundException Class not found
at org.apache.hadoop.hive.metastore.HiveMetaStoreUtils.getDeserializer(HiveMetaStoreUtils.java:98)
at org.apache.hadoop.hive.metastore.HiveMetaStoreUtils.getDeserializer(HiveMetaStoreUtils.java:77)
at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:331)

The issue was due to the Kudu HMS Client not sending the fields required
by Hive, namely the Input/Outputformat and Serialization library for the
created table when making a create table request. Thus, running queries through
Hive on Kudu tables would fail due to these fields missing in the HMS
Backend Database.

This patch adds the missing Input/Output formats and Serialization
library to table creation with Kudu HMS Integration.The patch also extends
the current test cases to cover the  added fields. Manually tested on a
seperate cluster by creating a Kudu table with several columns via
"stored as kudu", confirmed the missing data is sent by checking the
parameters of the create_table request in Hive log files, and checked
that the data is written to the HMS Backend Database by going through the SDS
table for INPUT_FORMAT, OUTPUT_FORMAT and SERDES table for SLIB to see
if the data was filled for the newly created kudu table.
Ran a few Hive queries on the created Kudu tables and confirmed that no errors
are present.

Change-Id: Ia1b53b55005e2899d8575b0fb7250351d914afb4
Reviewed-on: http://gerrit.cloudera.org:8080/19026
Reviewed-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Zoltan Chovan <zchovan@cloudera.com>
Tested-by: Attila Bukor <abukor@apache.org>
Reviewed-by: Attila Bukor <abukor@apache.org>
diff --git a/src/kudu/hms/hms_catalog-test.cc b/src/kudu/hms/hms_catalog-test.cc
index eb55c42..0eae67f 100644
--- a/src/kudu/hms/hms_catalog-test.cc
+++ b/src/kudu/hms/hms_catalog-test.cc
@@ -240,6 +240,9 @@
     for (int column_idx = 0; column_idx < schema.num_columns(); column_idx++) {
       EXPECT_EQ(table.sd.cols[column_idx].name, schema.columns()[column_idx].name());
     }
+    EXPECT_EQ(table.sd.inputFormat, HmsClient::kKuduInputFormat);
+    EXPECT_EQ(table.sd.outputFormat, HmsClient::kKuduOutputFormat);
+    EXPECT_EQ(table.sd.serdeInfo.serializationLib, HmsClient::kKuduSerDeLib);
   }
 
   Status CreateLegacyTable(const string& database_name,
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index 39fad29..4570dd2 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -422,6 +422,9 @@
     fields.emplace_back(column_to_field(column));
   }
   table->sd.cols = std::move(fields);
+  table->sd.inputFormat = HmsClient::kKuduInputFormat;
+  table->sd.outputFormat = HmsClient::kKuduOutputFormat;
+  table->sd.serdeInfo.serializationLib = HmsClient::kKuduSerDeLib;
 
   return Status::OK();
 }
diff --git a/src/kudu/hms/hms_client-test.cc b/src/kudu/hms/hms_client-test.cc
index aee2a04..5afa774 100644
--- a/src/kudu/hms/hms_client-test.cc
+++ b/src/kudu/hms/hms_client-test.cc
@@ -72,6 +72,9 @@
         make_pair(HmsClient::kKuduMasterAddrsKey, string("TODO")),
         make_pair(HmsClient::kStorageHandlerKey, HmsClient::kKuduStorageHandler)
     });
+    table.sd.inputFormat = HmsClient::kKuduInputFormat;
+    table.sd.outputFormat = HmsClient::kKuduOutputFormat;
+    table.sd.serdeInfo.serializationLib = HmsClient::kKuduSerDeLib;
 
     hive::EnvironmentContext env_ctx;
     env_ctx.__set_properties({ make_pair(HmsClient::kKuduMasterEventKey, "true") });
@@ -181,6 +184,9 @@
   EXPECT_EQ(cluster_id, my_table.parameters[HmsClient::kKuduClusterIdKey]);
   EXPECT_EQ(HmsClient::kKuduStorageHandler, my_table.parameters[HmsClient::kStorageHandlerKey]);
   EXPECT_EQ(HmsClient::kManagedTable, my_table.tableType);
+  EXPECT_EQ(HmsClient::kKuduInputFormat, my_table.sd.inputFormat);
+  EXPECT_EQ(HmsClient::kKuduOutputFormat, my_table.sd.outputFormat);
+  EXPECT_EQ(HmsClient::kKuduSerDeLib, my_table.sd.serdeInfo.serializationLib);
 
   string new_table_name = "my_altered_table";
 
@@ -206,6 +212,9 @@
   EXPECT_EQ(HmsClient::kKuduStorageHandler,
             renamed_table.parameters[HmsClient::kStorageHandlerKey]);
   EXPECT_EQ(HmsClient::kManagedTable, renamed_table.tableType);
+  EXPECT_EQ(HmsClient::kKuduInputFormat, renamed_table.sd.inputFormat);
+  EXPECT_EQ(HmsClient::kKuduOutputFormat, renamed_table.sd.outputFormat);
+  EXPECT_EQ(HmsClient::kKuduSerDeLib, renamed_table.sd.serdeInfo.serializationLib);
 
   // Create a table with an uppercase name.
   string uppercase_table_name = "my_UPPERCASE_Table";
@@ -333,6 +342,9 @@
   partition_key.name = "c1";
   partition_key.type = "int";
   table.partitionKeys.emplace_back(std::move(partition_key));
+  table.sd.inputFormat = HmsClient::kKuduInputFormat;
+  table.sd.outputFormat = HmsClient::kKuduOutputFormat;
+  table.sd.serdeInfo.serializationLib = HmsClient::kKuduSerDeLib;
 
   ASSERT_OK(client.CreateTable(table));
 
diff --git a/src/kudu/hms/hms_client.cc b/src/kudu/hms/hms_client.cc
index 8c65bf5..f531a46 100644
--- a/src/kudu/hms/hms_client.cc
+++ b/src/kudu/hms/hms_client.cc
@@ -23,6 +23,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <vector>
 
 #include <gflags/gflags.h>
@@ -30,9 +31,9 @@
 #include <thrift/TApplicationException.h>
 #include <thrift/Thrift.h>
 #include <thrift/protocol/TJSONProtocol.h>
-#include <thrift/protocol/TProtocol.h>
+#include <thrift/protocol/TProtocol.h> // IWYU pragma: keep
 #include <thrift/transport/TBufferTransports.h>
-#include <thrift/transport/TTransport.h>
+#include <thrift/transport/TTransport.h> // IWYU pragma: keep
 #include <thrift/transport/TTransportException.h>
 
 #include "kudu/gutil/macros.h"
@@ -143,6 +144,9 @@
 const char* const HmsClient::kHiveFilterFieldParams = "hive_filter_field_params__";
 const char* const HmsClient::kNotificationAddThriftObjects =
   "hive.metastore.notifications.add.thrift.objects";
+const char* const HmsClient::kKuduInputFormat ="org.apache.hadoop.hive.kudu.KuduInputFormat";
+const char* const HmsClient::kKuduOutputFormat ="org.apache.hadoop.hive.kudu.KuduOutputFormat";
+const char* const HmsClient::kKuduSerDeLib = "org.apache.hadoop.hive.kudu.KuduSerDe";
 
 const char* const HmsClient::kManagedTable = "MANAGED_TABLE";
 const char* const HmsClient::kExternalTable = "EXTERNAL_TABLE";
diff --git a/src/kudu/hms/hms_client.h b/src/kudu/hms/hms_client.h
index fe39f37..81e53ce 100644
--- a/src/kudu/hms/hms_client.h
+++ b/src/kudu/hms/hms_client.h
@@ -85,6 +85,10 @@
 
   static const char* const kServiceName;
 
+  static const char* const kKuduInputFormat;
+  static const char* const kKuduOutputFormat;
+  static const char* const kKuduSerDeLib;
+
   // Create an HmsClient connection to the provided HMS Thrift RPC address.
   HmsClient(const HostPort& address, const thrift::ClientOptions& options);
   ~HmsClient();
diff --git a/src/kudu/integration-tests/hms_itest-base.cc b/src/kudu/integration-tests/hms_itest-base.cc
index e7faacd..54e80af 100644
--- a/src/kudu/integration-tests/hms_itest-base.cc
+++ b/src/kudu/integration-tests/hms_itest-base.cc
@@ -20,6 +20,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -239,6 +240,9 @@
             hms_table.parameters[hms::HmsClient::kKuduMasterAddrsKey]);
   ASSERT_EQ(hms::HmsClient::kKuduStorageHandler,
             hms_table.parameters[hms::HmsClient::kStorageHandlerKey]);
+  ASSERT_EQ(hms::HmsClient::kKuduInputFormat, hms_table.sd.inputFormat);
+  ASSERT_EQ(hms::HmsClient::kKuduOutputFormat, hms_table.sd.outputFormat);
+  ASSERT_EQ(hms::HmsClient::kKuduSerDeLib, hms_table.sd.serdeInfo.serializationLib);
 }
 
 void HmsITestHarness::CheckTableDoesNotExist(const string& database_name,