IMPALA-9158: Support loading primary key/foreign key constraints
in LocalCatalog Mode.

This change add a new method 'loadConstraints()' to the MetaProvider
interface.

1. In CatalogdMetaProvider implementation, we fetch the primary key
  (PK) and foreign key(FK) information via the GetPartialCatalogObject()
  RPC to the catalogd. This is modified to include PK/FK information.
  This is because, on catalog side we eagerly load PK/FK information
  which can be sent over to local catalog in a single RPC to Catalog.
  This information is then stored in TableMetaRef object for future
  consumers.
2. In the DirectMetaProvider implementation, we make two RPCs to HMS
  to directly get PK/FK information.

Load constraints can be extended to include other constraints later
(for ex: unique constraints.)

Testing:
- Added tests in LocalCatalogTest, CatalogTest and PartialCatalogInfoTest
- This change also modifies the toSqlUtil for show create table
  statements. Added a test for the same.

Change-Id: I7ea7e1bacf6eb502c67caf310a847b32687e0d58
Reviewed-on: http://gerrit.cloudera.org:8080/14731
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 4dc8336..1b809a8 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -324,6 +324,10 @@
   // ... each partition should include the partition stats serialized as a byte[]
   // and that is deflate-compressed.
   7: bool want_partition_stats
+
+  // The response should contain table constraints like primary keys
+  // and foreign keys
+  8: bool want_table_constraints
 }
 
 // Returned information about a particular partition.
@@ -375,6 +379,14 @@
   // than duplicate the list of network address, which helps reduce memory usage.
   // Only used when partition files are fetched.
   7: optional list<Types.TNetworkAddress> network_addresses
+
+  // List of primary key constraints, small enough that we can
+  // return them wholesale.
+  8: optional list<hive_metastore.SQLPrimaryKey> primary_keys
+
+  // List of foreign key constraints, small enough that we can
+  // return them wholesale.
+  9: optional list<hive_metastore.SQLForeignKey> foreign_keys
 }
 
 // Selector for partial information about a Database.
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index e196af1..0f1afa7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -36,7 +36,6 @@
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -49,6 +48,7 @@
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.MetaStoreUtil;
+import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -547,14 +547,17 @@
         // Hive has a bug that prevents foreign keys from being added when pk column is
         // not part of primary key. This can be confusing. Till this bug is fixed, we
         // will not allow foreign keys definition on such columns.
-        if (parentTable instanceof HdfsTable) {
-          // TODO (IMPALA-9158): Modify this check to call FeFsTable.getPrimaryKeysSql()
-          // instead of HdfsTable.getPrimaryKeysSql() when we implement PK/FK feature
-          // for LocalCatalog.
-          if (!((HdfsTable) parentTable).getPrimaryKeysSql().contains(pkCol)) {
+        try {
+          if (!((FeFsTable) parentTable).getPrimaryKeyColumnNames().contains(pkCol)) {
             throw new AnalysisException(String.format("Parent column %s is not part of "
                 + "primary key.", pkCol));
           }
+        } catch (TException e) {
+          // In local catalog mode, we do not aggressively load PK/FK information, a
+          // call to getPrimaryKeyColumnNames() will try to selectively load PK/FK
+          // information. Hence, TException is thrown only in local catalog mode.
+          throw new AnalysisException("Failed to get primary key columns for "
+              + fk.pkTableName);
         }
       }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 5b2dedf..ef315b8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -30,7 +30,6 @@
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.ql.parse.HiveLexer;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
@@ -42,7 +41,6 @@
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
@@ -374,9 +372,11 @@
       String inputFormat = msTable.getSd().getInputFormat();
       format = HdfsFileFormat.fromHdfsInputFormatClass(inputFormat);
       compression = HdfsCompression.fromHdfsInputFormatClass(inputFormat);
-      if (table instanceof HdfsTable) {
-        primaryKeySql = ((HdfsTable) table).getPrimaryKeysSql();
-        foreignKeySql = ((HdfsTable) table).getForeignKeysSql();
+      try {
+        primaryKeySql = ((FeFsTable) table).getPrimaryKeyColumnNames();
+        foreignKeySql = ((FeFsTable) table).getForeignKeysSql();
+      } catch (Exception e) {
+        throw new CatalogException("Could not get primary key/foreign keys sql.", e);
       }
     }
     HdfsUri tableLocation = location == null ? null : new HdfsUri(location);
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 076c84a..b024da1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -48,8 +48,10 @@
 import org.apache.impala.util.ListMap;
 import org.apache.impala.util.TAccessLevelUtil;
 import org.apache.impala.util.TResultRowBuilder;
+import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 
 /**
@@ -192,6 +194,59 @@
   List<SQLForeignKey> getForeignKeys();
 
   /**
+   * @return  List of primary keys column names, useful for toSqlUtils. In local
+   * catalog mode, this causes load of constraints.
+   */
+  default List<String> getPrimaryKeyColumnNames() throws TException {
+    List<String> primaryKeyColNames = new ArrayList<>();
+    List<SQLPrimaryKey> primaryKeys = getPrimaryKeys();
+    if (!primaryKeys.isEmpty()) {
+      primaryKeys.stream().forEach(p -> primaryKeyColNames.add(p.getColumn_name()));
+    }
+    return primaryKeyColNames;
+  }
+
+  /**
+   * Get foreign keys information as strings. Useful for toSqlUtils.
+   * @return List of strings of the form "(col1, col2,..) REFERENCES [pk_db].pk_table
+   * (colA, colB,..)". In local catalog mode, this causes load of constraints.
+   */
+  default List<String> getForeignKeysSql() throws TException{
+    List<String> foreignKeysSql = new ArrayList<>();
+    // Iterate through foreign keys list. This list may contain multiple foreign keys
+    // and each foreign key may contain multiple columns. The outerloop collects
+    // information common to a foreign key (pk table information). The inner
+    // loop collects column information.
+    List<SQLForeignKey> foreignKeys = getForeignKeys();
+    for (int i = 0; i < foreignKeys.size(); i++) {
+      String pkTableDb = foreignKeys.get(i).getPktable_db();
+      String pkTableName = foreignKeys.get(i).getPktable_name();
+      List<String> pkList = new ArrayList<>();
+      List<String> fkList = new ArrayList<>();
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      for (; i < foreignKeys.size(); i++) {
+        fkList.add(foreignKeys.get(i).getFkcolumn_name());
+        pkList.add(foreignKeys.get(i).getPkcolumn_name());
+        // Foreign keys for a table can consist of multiple columns, they are represented
+        // as different SQLForeignKey structures. A key_seq is used to stitch together
+        // the entire sequence that forms the foreign key. Hence, we bail out of inner
+        // loop if the key_seq of the next SQLForeignKey is 1.
+        if (i + 1 < foreignKeys.size() && foreignKeys.get(i + 1).getKey_seq() == 1) {
+          break;
+        }
+      }
+      Joiner.on(", ").appendTo(sb, fkList).append(") ");
+      sb.append("REFERENCES ");
+      if (pkTableDb != null) sb.append(pkTableDb + ".");
+      sb.append(pkTableName + "(");
+      Joiner.on(", ").appendTo(sb, pkList).append(")");
+      foreignKeysSql.add(sb.toString());
+    }
+    return foreignKeysSql;
+  }
+
+  /**
    * Parses and returns the value of the 'skip.header.line.count' table property. If the
    * value is not set for the table, returns 0. If parsing fails or a value < 0 is found,
    * the error parameter is updated to contain an error message.
@@ -200,7 +255,7 @@
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     if (msTbl == null ||
         !msTbl.getParameters().containsKey(
-          FeFsTable.Utils.TBL_PROP_SKIP_HEADER_LINE_COUNT)) {
+            FeFsTable.Utils.TBL_PROP_SKIP_HEADER_LINE_COUNT)) {
       return 0;
     }
     return Utils.parseSkipHeaderLineCount(msTbl.getParameters(), error);
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeTable.java b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
index 8a69d53..3d069c2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
@@ -24,7 +24,6 @@
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.impala.analysis.TableName;
-import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableStats;
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 3a6d71f..ae1e3d7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -245,7 +245,8 @@
   // for setAvroSchema().
   private boolean isSchemaLoaded_ = false;
 
-  // Primary Key and Foreign Key information. Set in load() method.
+  // Primary Key and Foreign Key information. Set in load() method. An empty list could
+  // mean either the table does not have any keys or the table is not loaded.
   private final List<SQLPrimaryKey> primaryKeys_ = new ArrayList<>();
   private final List<SQLForeignKey> foreignKeys_ = new ArrayList<>();
 
@@ -992,7 +993,7 @@
             MetaStoreUtil.getNullPartitionKeyValue(client).intern();
           loadSchema(msTbl);
           loadAllColumnStats(client);
-          loadPkFkInfo(client, msTbl);
+          loadConstraintsInfo(client, msTbl);
         }
         loadValidWriteIdList(client);
         // Load partition and file metadata
@@ -1047,7 +1048,7 @@
    * Load Primary Key and Foreign Key information for table. Throws TableLoadingException
    * if the load fails.
    */
-  private void loadPkFkInfo(IMetaStoreClient client,
+  private void loadConstraintsInfo(IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException{
     try {
       // Reset and add primary keys info and foreign keys info.
@@ -1551,6 +1552,13 @@
       // of cloning of file descriptors which might increase memory pressure.
       resp.table_info.setNetwork_addresses(hostIndex_.getList());
     }
+
+    if (req.table_info_selector.want_table_constraints) {
+      List<SQLPrimaryKey> primaryKeys = new ArrayList<>(primaryKeys_);
+      List<SQLForeignKey> foreignKeys = new ArrayList<>(foreignKeys_);
+      resp.table_info.setPrimary_keys(primaryKeys);
+      resp.table_info.setForeign_keys(foreignKeys);
+    }
     return resp;
   }
 
@@ -1651,55 +1659,6 @@
   }
 
   /**
-   * Get primary keys column names, useful for toSqlUtils.
-   */
-  public List<String> getPrimaryKeysSql() {
-    List<String> primaryKeyColNames = new ArrayList<>();
-    if (getPrimaryKeys() != null && !getPrimaryKeys().isEmpty()) {
-      getPrimaryKeys().stream().forEach(p -> primaryKeyColNames.add(p.getColumn_name()));
-    }
-    return primaryKeyColNames;
-  }
-
-  /**
-   * Get foreign keys information as strings. Useful for toSqlUtils.
-   * @return List of strings of the form "(col1, col2,..) REFERENCES [pk_db].pk_table
-   * (colA, colB,..)".
-   */
-  public List<String> getForeignKeysSql() {
-    List<String> foreignKeysSql = new ArrayList<>();
-    // Iterate through foreign keys list. This list may contain multiple foreign keys
-    // and each foreign key may contain multiple columns. The outer loop collects
-    // information common to a foreign key (pk table information). The inner
-    // loop collects column information.
-    List<SQLForeignKey> foreignKeys = getForeignKeys();
-    for (int i = 0; i < foreignKeys.size(); i++) {
-      String pkTableDb = foreignKeys.get(i).getPktable_db();
-      String pkTableName = foreignKeys.get(i).getPktable_name();
-      List<String> pkList = new ArrayList<>();
-      List<String> fkList = new ArrayList<>();
-      StringBuilder sb = new StringBuilder();
-      sb.append("(");
-      for (; i<foreignKeys.size(); i++) {
-        fkList.add(foreignKeys.get(i).getFkcolumn_name());
-        pkList.add(foreignKeys.get(i).getPkcolumn_name());
-        // Bail out of inner loop if the key_seq of the next ForeignKey is 1.
-        if (i + 1 < foreignKeys.size() && foreignKeys.get(i + 1).getKey_seq() == 1) {
-          break;
-        }
-      }
-      Joiner.on(", ").appendTo(sb, fkList).append(") ");
-      sb.append("REFERENCES ");
-      if (pkTableDb != null) sb.append(pkTableDb + ".");
-      sb.append(pkTableName + "(");
-      Joiner.on(", ").appendTo(sb, pkList).append(")");
-      foreignKeysSql.add(sb.toString());
-    }
-    return foreignKeysSql;
-  }
-
-
-  /**
    * Returns the set of file formats that the partitions are stored in.
    */
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index f638408..d9c8b6c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -39,6 +39,8 @@
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.AuthorizationChecker;
@@ -686,13 +688,17 @@
           public TableMetaRefImpl call() throws Exception {
             TGetPartialCatalogObjectRequest req = newReqForTable(dbName, tableName);
             req.table_info_selector.want_hms_table = true;
+            // To be consistent with implementation in legacy catalog mode, we eagerly
+            // load constraint information whenever a table is loaded.
+            req.table_info_selector.want_table_constraints = true;
             TGetPartialCatalogObjectResponse resp = sendRequest(req);
             checkResponse(resp.table_info != null && resp.table_info.hms_table != null,
                 req, "missing expected HMS table");
             addTableMetadatStorageLoadTimeToProfile(
                 resp.table_info.storage_metadata_load_time_ns);
             return new TableMetaRefImpl(
-                dbName, tableName, resp.table_info.hms_table, resp.object_version_number);
+                dbName, tableName, resp.table_info.hms_table, resp.object_version_number,
+                resp.table_info.primary_keys, resp.table_info.foreign_keys);
            }
       });
     return Pair.create(ref.msTable_, (TableMetaRef)ref);
@@ -791,6 +797,15 @@
   }
 
   @Override
+  public Pair<List<SQLPrimaryKey>, List<SQLForeignKey>> loadConstraints(
+      final TableMetaRef table, Table msTbl) {
+     Pair<List<SQLPrimaryKey>, List<SQLForeignKey>> pair =
+         new Pair<>(((TableMetaRefImpl) table).primaryKeys_,
+         ((TableMetaRefImpl) table).foreignKeys_);
+     return pair;
+  }
+
+  @Override
   public Map<String, PartitionMetadata> loadPartitionsByRefs(TableMetaRef table,
       List<String> partitionColumnNames,
       ListMap<TNetworkAddress> hostIndex,
@@ -1378,6 +1393,9 @@
   private static class TableMetaRefImpl implements TableMetaRef {
     private final String dbName_;
     private final String tableName_;
+    // SQL constraints for the table, populated during loadTable().
+    private final List<SQLPrimaryKey> primaryKeys_;
+    private final List<SQLForeignKey> foreignKeys_;
 
     /**
      * Stash the HMS Table object since we need this in order to handle some strange
@@ -1393,11 +1411,14 @@
     private final long catalogVersion_;
 
     public TableMetaRefImpl(String dbName, String tableName,
-        Table msTable, long catalogVersion) {
+        Table msTable, long catalogVersion, List<SQLPrimaryKey> primaryKeys,
+        List<SQLForeignKey> foreignKeys) {
       this.dbName_ = dbName;
       this.tableName_ = tableName;
       this.msTable_ = msTable;
       this.catalogVersion_ = catalogVersion;
+      this.primaryKeys_ = primaryKeys;
+      this.foreignKeys_ = foreignKeys;
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index a92a750..64e51f4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -19,6 +19,7 @@
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -27,9 +28,13 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.AuthorizationPolicy;
@@ -159,6 +164,21 @@
   }
 
   @Override
+  public Pair<List<SQLPrimaryKey>, List<SQLForeignKey>> loadConstraints(
+      TableMetaRef table, Table msTbl) throws TException {
+    List<SQLPrimaryKey> primaryKeys = new ArrayList<>();
+    List<SQLForeignKey> foreignKeys = new ArrayList<>();
+
+    try (MetaStoreClient c = msClientPool_.getClient()) {
+      primaryKeys.addAll(c.getHiveClient().getPrimaryKeys(
+          new PrimaryKeysRequest(msTbl.getDbName(), msTbl.getTableName())));
+      foreignKeys.addAll(c.getHiveClient().getForeignKeys(new ForeignKeysRequest(null,
+          null, msTbl.getDbName(), msTbl.getTableName())));
+    }
+    return new Pair<>(primaryKeys, foreignKeys);
+  }
+
+  @Override
   public Map<String, PartitionMetadata> loadPartitionsByRefs(
       TableMetaRef table, List<String> partitionColumnNames,
       ListMap<TNetworkAddress> hostIndex,
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index f49e8af..d9cf712 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -53,6 +53,7 @@
 import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
@@ -106,6 +107,19 @@
    */
   private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<>();
 
+
+  /**
+   * List of primary keys associated with the table. An empty list could
+   * mean either the table does not have any primary keys or the table is not loaded.
+   */
+  private final List<SQLPrimaryKey> primaryKeys_ = new ArrayList<>();
+
+  /**
+   * List of foreign keys associated with the table. An empty list could
+   * mean either the table does not have any foreign keys or the table is not loaded.
+   */
+  private final List<SQLForeignKey> foreignKeys_ = new ArrayList<>();
+
   /**
    * The Avro schema for this table. Non-null if this table is an Avro table.
    * If this table is not an Avro table, this is usually null, but may be
@@ -496,6 +510,19 @@
   }
 
   /**
+   * Populate constraint information by making a request to MetaProvider.
+   */
+  private void loadConstraints() throws TException {
+    Pair<List<SQLPrimaryKey>, List<SQLForeignKey>> constraints =
+        db_.getCatalog().getMetaProvider().loadConstraints(ref_, msTable_);
+    // clear and load constraints.
+    primaryKeys_.clear();
+    foreignKeys_.clear();
+    primaryKeys_.addAll(constraints.first);
+    foreignKeys_.addAll(constraints.second);
+  }
+
+  /**
    * Override base implementation to populate column stats for
    * clustering columns based on the partition map.
    */
@@ -523,4 +550,28 @@
   public ListMap<TNetworkAddress> getHostIndex() {
     return hostIndex_;
   }
+
+  @Override
+  public List<SQLPrimaryKey> getPrimaryKeys() {
+    try {
+      loadConstraints();
+    } catch (TException e) {
+      throw new LocalCatalogException("Failed to load primary keys for table "
+          + getFullName(), e);
+    }
+    // Once loaded, clients must not be able to modify this list.
+    return ImmutableList.copyOf(primaryKeys_);
+  }
+
+  @Override
+  public List<SQLForeignKey> getForeignKeys() {
+    try {
+      loadConstraints();
+    } catch (TException e) {
+      throw new LocalCatalogException("Failed to load foreign keys for table "
+          + getFullName(), e);
+    }
+    // Once loaded, clients must not be able to modify this list.
+    return ImmutableList.copyOf(foreignKeys_);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 22c1047..ef87c25 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -25,6 +25,8 @@
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.AuthorizationPolicy;
@@ -78,6 +80,9 @@
   List<PartitionRef> loadPartitionList(TableMetaRef table)
       throws MetaException, TException;
 
+  Pair<List<SQLPrimaryKey>, List<SQLForeignKey>> loadConstraints(TableMetaRef table,
+      Table msTbl) throws MetaException, TException;
+
   /**
    * Retrieve the list of functions in the given database.
    */
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index 3f44312..e038013 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -29,6 +29,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
@@ -47,6 +48,8 @@
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NumericLiteral;
@@ -433,6 +436,53 @@
     checkAllTypesPartitioning(table, true);
   }
 
+  /**
+   * Test SQL constraints such as primary keys and foreign keys
+   */
+  @Test
+  public void testGetSqlConstraints() throws Exception {
+    FeFsTable t = (FeFsTable) catalog_.getOrLoadTable("functional", "parent_table",
+        "test");
+    assertNotNull(t);
+    assertTrue(t instanceof FeFsTable);
+    List<SQLPrimaryKey> primaryKeys = t.getPrimaryKeys();
+    List<SQLForeignKey> foreignKeys = t.getForeignKeys();
+    assertEquals(2, primaryKeys.size());
+    assertEquals(0, foreignKeys.size());
+    for (SQLPrimaryKey pk: primaryKeys) {
+      assertEquals("functional", pk.getTable_db());
+      assertEquals("parent_table", pk.getTable_name());
+    }
+    // HMS returns the columns in the reverse order of PK columns specified in the DDL.
+    // "parent_table" in our test data has primary key(id, year) specified.
+    assertEquals("year", primaryKeys.get(0).getColumn_name());
+    assertEquals("id", primaryKeys.get(1).getColumn_name());
+
+    // Force load parent_table_2. Required for fetching foreign keys from child_table.
+    catalog_.getOrLoadTable("functional", "parent_table_2", "test");
+
+    t = (FeFsTable) catalog_.getOrLoadTable("functional", "child_table", "test");
+    assertNotNull(t);
+    assertTrue(t instanceof FeFsTable);
+    primaryKeys = t.getPrimaryKeys();
+    foreignKeys = t.getForeignKeys();
+    assertEquals(1, primaryKeys.size());
+    assertEquals(3, foreignKeys.size());
+    assertEquals("functional", primaryKeys.get(0).getTable_db());
+    assertEquals("child_table",primaryKeys.get(0).getTable_name());
+    for (SQLForeignKey fk : foreignKeys) {
+      assertEquals("functional", fk.getFktable_db());
+      assertEquals("child_table", fk.getFktable_name());
+      assertEquals("functional", fk.getPktable_db());
+    }
+    assertEquals("parent_table", foreignKeys.get(0).getPktable_name());
+    assertEquals("parent_table", foreignKeys.get(1).getPktable_name());
+    assertEquals("parent_table_2", foreignKeys.get(2).getPktable_name());
+    assertEquals("id", foreignKeys.get(0).getPkcolumn_name());
+    assertEquals("year", foreignKeys.get(1).getPkcolumn_name());
+    assertEquals("a", foreignKeys.get(2).getPkcolumn_name());
+  }
+
   public static void checkAllTypesPartitioning(FeFsTable table,
       boolean checkFileDescriptors) {
     assertEquals(24, table.getPartitionIds().size());
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
index f073576..e5fc072 100644
--- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
@@ -32,6 +32,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
@@ -241,6 +243,62 @@
   }
 
   @Test
+  public void testGetSqlConstraints() throws Exception {
+    // Test constraints of parent table.
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "parent_table");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.want_table_constraints = true;
+
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    List<SQLPrimaryKey> primaryKeys = resp.table_info.primary_keys;
+    List<SQLForeignKey> foreignKeys = resp.table_info.foreign_keys;
+
+    assertEquals(2, primaryKeys.size());
+    assertEquals(0, foreignKeys.size());
+    for (SQLPrimaryKey pk: primaryKeys) {
+      assertEquals("functional", pk.getTable_db());
+      assertEquals("parent_table", pk.getTable_name());
+    }
+    // HMS returns the columns in the reverse order of PK columns specified in the DDL.
+    // "parent_table" in our test data has primary key(id, year) specified.
+    assertEquals("year", primaryKeys.get(0).getColumn_name());
+    assertEquals("id", primaryKeys.get(1).getColumn_name());
+
+    // Test constraints of child_table.
+    req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "child_table");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.want_table_constraints = true;
+
+    resp = sendRequest(req);
+    primaryKeys = resp.table_info.primary_keys;
+    foreignKeys = resp.table_info.foreign_keys;
+
+    assertEquals(1, primaryKeys.size());
+    assertEquals(3, foreignKeys.size());
+    assertEquals("functional", primaryKeys.get(0).getTable_db());
+    assertEquals("child_table",primaryKeys.get(0).getTable_name());
+    for (SQLForeignKey fk : foreignKeys) {
+      assertEquals("functional", fk.getFktable_db());
+      assertEquals("child_table", fk.getFktable_name());
+      assertEquals("functional", fk.getPktable_db());
+    }
+    assertEquals("parent_table", foreignKeys.get(0).getPktable_name());
+    assertEquals("parent_table", foreignKeys.get(1).getPktable_name());
+    assertEquals("parent_table_2", foreignKeys.get(2).getPktable_name());
+    assertEquals("id", foreignKeys.get(0).getPkcolumn_name());
+    assertEquals("year", foreignKeys.get(1).getPkcolumn_name());
+    assertEquals("a", foreignKeys.get(2).getPkcolumn_name());
+  }
+
+  @Test
   public void testConcurrentPartialObjectRequests() throws Exception {
     // Create a request.
     TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 2926fa0..48be5b7 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -22,6 +22,8 @@
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
 import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesReq;
@@ -163,6 +165,49 @@
   }
 
   /**
+   * Test SQL constraints such as primary keys and foreign keys
+   */
+  @Test
+  public void testGetSqlConstraints() throws Exception {
+    FeFsTable t = (FeFsTable) catalog_.getTable("functional", "parent_table");
+    assertNotNull(t);
+    assertTrue(t instanceof LocalFsTable);
+    List<SQLPrimaryKey> primaryKeys = t.getPrimaryKeys();
+    List<SQLForeignKey> foreignKeys = t.getForeignKeys();
+    assertEquals(2, primaryKeys.size());
+    assertEquals(0, foreignKeys.size());
+    for (SQLPrimaryKey pk: primaryKeys) {
+      assertEquals("functional", pk.getTable_db());
+      assertEquals("parent_table", pk.getTable_name());
+    }
+    // HMS returns the columns in the reverse order of PK columns specified in the DDL.
+    // "parent_table" in our test data has primary key(id, year) specified.
+    assertEquals("year", primaryKeys.get(0).getColumn_name());
+    assertEquals("id", primaryKeys.get(1).getColumn_name());
+
+    t = (FeFsTable) catalog_.getTable("functional", "child_table");
+    assertNotNull(t);
+    assertTrue(t instanceof LocalFsTable);
+    primaryKeys = t.getPrimaryKeys();
+    foreignKeys = t.getForeignKeys();
+    assertEquals(1, primaryKeys.size());
+    assertEquals(3, foreignKeys.size());
+    assertEquals("functional", primaryKeys.get(0).getTable_db());
+    assertEquals("child_table",primaryKeys.get(0).getTable_name());
+    for (SQLForeignKey fk : foreignKeys) {
+      assertEquals("functional", fk.getFktable_db());
+      assertEquals("child_table", fk.getFktable_name());
+      assertEquals("functional", fk.getPktable_db());
+    }
+    assertEquals("parent_table", foreignKeys.get(0).getPktable_name());
+    assertEquals("parent_table", foreignKeys.get(1).getPktable_name());
+    assertEquals("parent_table_2", foreignKeys.get(2).getPktable_name());
+    assertEquals("id", foreignKeys.get(0).getPkcolumn_name());
+    assertEquals("year", foreignKeys.get(1).getPkcolumn_name());
+    assertEquals("a", foreignKeys.get(2).getPkcolumn_name());
+  }
+
+  /**
    * Test that partitions with a NULL value can be properly loaded.
    */
   @Test
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index fae008c..db62d67 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -538,3 +538,33 @@
 TBLPROPERTIES ('hbase.table.name'='functional_hbase.alltypes',
                'storage_handler'='org.apache.hadoop.hive.hbase.HBaseStorageHandler')
 ====
+---- QUERY
+SHOW CREATE TABLE functional.parent_table
+---- RESULTS-HIVE
+CREATE EXTERNAL TABLE functional.parent_table (
+  id INT,
+  year STRING,
+  PRIMARY KEY (year, id)
+ )
+ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',')
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+====
+---- QUERY
+SHOW CREATE TABLE functional.child_table
+---- RESULTS-HIVE
+CREATE EXTERNAL TABLE functional.child_table (
+  seq INT,
+  id INT,
+  year STRING,
+  a INT,
+  PRIMARY KEY (seq),
+  FOREIGN KEY(id, year) REFERENCES functional.parent_table(id, year),
+  FOREIGN KEY(a) REFERENCES functional.parent_table_2(a)
+)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',')
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+====