IMPALA-2112: Support primary key/foreign key constraints as part of
create table in Impala.

This is the first of several changes to use informational, unenforced
primary key(pk) and foreign key(fk) specifications in Impala.
The parent JIRA for this effort is IMPALA-3531.

This change adds support for adding pk/fk information during create
table DDLs. There is only limited SQL syntax support as of now and will
add various other SQL styles including ANSI syntax support in later
changes. Currently the only supported way of adding fk/pk information
is after the column definitions. Examples are:

CREATE TABLE pk(col1 INT, col2 STRING, PRIMARY KEY(col1, col2));

CREATE TABLE fk(id INT, col1 INT, col2 STRING, PRIMARY KEY(id),
FOREIGN KEY(col1, col2) REFERENCES pk(col1, col2));

In the current implementation, manual specification of constraint names
is not supported. Internally we use UUIDs for contraint name generation.
Additionally, three constraint states are supported to comply with
Hive's implementation which were taken from Oracle.
   DISABLE (default true)
   NOVALIDATE (default true)
   RELY (default true)
More info here:
https://docs.oracle.com/database/121/DWHSG/schemas.htm#DWHSG9053

These constraints can be optionally specified after each PK/FK
specification like:

CREATE TABLE pk(id INT, PRIMARY KEY(id) DISABLE, NOVALIDATE, RELY);

However, a specification like this will throw an analysis errror:

CREATE TABLE pk(id INT, PRIMARY KEY(id) ENABLE, VALIDATE, RELY);

Notes:
- toSql support is not fully functional. Observability changes like showing
  PK/FK information in DESCRIBE output will be done separately.
- Retrieval of primary keys and foreign keys is currently not supported
  in Local Catalog Mode.

Tests:
Added tests to:
 - AnalyzeDDLTest#TestCreateTable
 - ParserTest#TestCreateTable
 - ToSqlTest#TestCreateTable
 - Built against both Hive-2 and Hive-3
Change-Id: Id03d8d4d41a2ac1b15e7060e2a013e334d044ee7
Reviewed-on: http://gerrit.cloudera.org:8080/14592
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index d807dc4..577a36c 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -364,7 +364,13 @@
   // Each TNetworkAddress is a datanode which contains blocks of a file in the table.
   // Used so that each THdfsFileBlock can just reference an index in this list rather
   // than duplicate the list of network address, which helps reduce memory usage.
-  7: optional list<Types.TNetworkAddress> network_addresses
+  7: optional list<Types.TNetworkAddress> network_addresses,
+
+  // Primary Keys information for HDFS Tables
+  11: optional list<hive_metastore.SQLPrimaryKey> primary_keys,
+
+  // Foreign Keys information for HDFS Tables
+  12: optional list<hive_metastore.SQLForeignKey> foreign_keys
 }
 
 struct THBaseTable {
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 8460822..81d4742 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -22,6 +22,7 @@
 include "Types.thrift"
 include "Status.thrift"
 include "TCLIService.thrift"
+include "hive_metastore.thrift"
 
 // This is a short value due to the HDFS API limits
 const i16 HDFS_DEFAULT_CACHE_REPLICATION_FACTOR = 1
@@ -535,6 +536,12 @@
 
   // The sorting order used in SORT BY clauses.
   18: required Types.TSortingOrder sorting_order
+
+  // Primary Keys Structures for Hive API
+  19: optional list<hive_metastore.SQLPrimaryKey> primary_keys;
+
+  // Foreign Keys Structure for Hive API
+  20: optional list<hive_metastore.SQLForeignKey> foreign_keys;
 }
 
 // Parameters of a CREATE VIEW or ALTER VIEW AS SELECT command
diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index 16f9a9e..87e8ecf 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -31,6 +31,8 @@
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -127,6 +129,15 @@
     client.alter_partitions(dbName, tableName, partitions, null);
   }
 
+  /**
+   * Wrapper around IMetaStoreClient.createTableWithConstraints() to deal with added
+   * arguments.
+   */
+  public static void createTableWithConstraints(IMetaStoreClient client,
+      Table newTbl, List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys)
+      throws InvalidOperationException, MetaException, TException {
+    client.createTableWithConstraints(newTbl, primaryKeys, foreignKeys);
+  }
 
  /**
   * Hive-3 only function
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 8fda9dc..d027a08 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -45,6 +45,8 @@
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -221,6 +223,18 @@
   }
 
   /**
+   * Wrapper around IMetaStoreClient.createTableWithConstraints() to deal with added
+   * arguments. Hive four new arguments are uniqueConstraints, notNullConstraints,
+   * defaultConstraints, and checkConstraints.
+   */
+  public static void createTableWithConstraints(IMetaStoreClient client,
+      Table newTbl, List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys)
+      throws InvalidOperationException, MetaException, TException {
+    client.createTableWithConstraints(newTbl,primaryKeys, foreignKeys, null, null,
+        null, null);
+  }
+
+  /**
    * Wrapper around IMetaStoreClient.alter_partitions with transaction information
    */
   public static void alterPartitionsWithTransaction(IMetaStoreClient client,
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 159e815..ecce9cc 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -280,28 +280,30 @@
   KW_ARRAY, KW_AS, KW_ASC, KW_AUTHORIZATION, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY,
   KW_BLOCKSIZE, KW_BOOLEAN, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE,
   KW_CHAR, KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPRESSION,
-  KW_COMPUTE, KW_COPY, KW_CREATE, KW_CROSS, KW_CURRENT, KW_DATA, KW_DATABASE,
-  KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, KW_DELETE, KW_DELIMITED,
-  KW_DESC, KW_DESCRIBE, KW_DISTINCT, KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE, KW_ENCODING,
-  KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN, KW_EXTENDED, KW_EXTERNAL, KW_FALSE,
-  KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT, KW_FOLLOWING,
-  KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, KW_GRANT,
-  KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF, KW_ILIKE, KW_IN, KW_INCREMENTAL,
-  KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL,
-  KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS, KW_JOIN, KW_KUDU, KW_LAST, KW_LEFT,
-  KW_LEXICAL, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP, KW_MERGE_FN,
-  KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORC, KW_ORDER,
-  KW_OUTER, KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION,
-  KW_PARTITIONED, KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED,
-  KW_PURGE, KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RENAME,
-  KW_REPEATABLE, KW_REPLACE, KW_REPLICATION, KW_RESTRICT, KW_RETURNS, KW_REVOKE, KW_RIGHT,
-  KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SEMI,
-  KW_SEQUENCEFILE, KW_SERDEPROPERTIES, KW_SERIALIZE_FN, KW_SET, KW_SHOW, KW_SMALLINT,
-  KW_SORT, KW_STORED, KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT, KW_SYMBOL, KW_TABLE,
-  KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES, KW_TERMINATED, KW_TEXTFILE, KW_THEN,
-  KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED,
-  KW_UNCACHED, KW_UNION, KW_UNKNOWN, KW_UPDATE, KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING,
-  KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH, KW_ZORDER;
+  KW_COMPUTE, KW_CONSTRAINT, KW_COPY, KW_CREATE, KW_CROSS, KW_CURRENT, KW_DATA,
+  KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, KW_DELETE,
+  KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISABLE, KW_DISTINCT, KW_DIV, KW_DOUBLE,
+  KW_DROP, KW_ELSE, KW_ENABLE, KW_ENCODING, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN,
+  KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN,
+  KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FOREIGN, KW_FORMAT, KW_FORMATTED,
+  KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE,
+  KW_HAVING, KW_IF, KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH,
+  KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP,
+  KW_IS, KW_JOIN,  KW_KUDU, KW_LAST, KW_LEFT, KW_LEXICAL, KW_LIKE, KW_LIMIT, KW_LINES,
+  KW_LOAD, KW_LOCATION, KW_MAP, KW_MERGE_FN, KW_METADATA, KW_NORELY, KW_NOT,
+  KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORC, KW_ORDER, KW_OUTER,
+  KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION, KW_PARTITIONED,
+  KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED, KW_PURGE,
+  KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFERENCES, KW_REFRESH, KW_REGEXP, KW_RELY,
+  KW_RENAME, KW_REPEATABLE, KW_REPLACE, KW_REPLICATION, KW_RESTRICT, KW_RETURNS,
+  KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA,
+  KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SEQUENCEFILE, KW_SERDEPROPERTIES, KW_SERIALIZE_FN,
+  KW_SET, KW_SHOW, KW_SMALLINT, KW_SORT, KW_STORED, KW_STRAIGHT_JOIN, KW_STRING,
+  KW_STRUCT, KW_SYMBOL, KW_TABLE, KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES,
+  KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS,
+  KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UNKNOWN, KW_UPDATE,
+  KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALIDATE, KW_VALUES, KW_VARCHAR, KW_VIEW,
+  KW_WHEN, KW_WHERE, KW_WITH, KW_ZORDER;
 
 terminal UNUSED_RESERVED_WORD;
 
@@ -457,6 +459,7 @@
 nonterminal TableDef tbl_def_without_col_defs, tbl_def_with_col_defs;
 nonterminal TableDataLayout opt_tbl_data_layout, partitioned_data_layout;
 nonterminal TableDef.Options tbl_options;
+nonterminal List<TableDef.ForeignKey> foreign_keys_list;
 nonterminal CreateViewStmt create_view_stmt;
 nonterminal CreateDataSrcStmt create_data_src_stmt;
 nonterminal DropDataSrcStmt drop_data_src_stmt;
@@ -512,6 +515,8 @@
 nonterminal LiteralExpr block_size_val;
 nonterminal Pair<Option, Object> column_option;
 nonterminal Map<Option, Object> column_options_map;
+// Used for integrity constraints(DISABLE, NOVALIDATE, RELY)
+nonterminal Boolean enable_spec, validate_spec, rely_spec;
 
 // For GRANT/REVOKE/AUTH DDL statements
 nonterminal ShowRolesStmt show_roles_stmt;
@@ -1428,13 +1433,80 @@
     tbl_def.getColumnDefs().addAll(list);
     RESULT = tbl_def;
   :}
+    // Adding Primary Keys and Foreign Keys constraints
+    // PRIMARY KEY(..) DISABLE NOVALIDATE RELY
   | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA
     primary_keys:primary_keys RPAREN
   {:
     tbl_def.getColumnDefs().addAll(list);
     tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
+    TableDef.PrimaryKey pk = new TableDef.PrimaryKey(tbl_def.getTblName(),
+        primary_keys, null, true, false, false);
+    tbl_def.setPrimaryKey(pk);
     RESULT = tbl_def;
   :}
+  | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA
+      primary_keys:primary_keys enable_spec:enable_spec validate_spec:validate_spec
+      rely_spec:rely_spec RPAREN
+  {:
+    tbl_def.getColumnDefs().addAll(list);
+    TableDef.PrimaryKey pk = new TableDef.PrimaryKey(tbl_def.getTblName(),
+        primary_keys, null, rely_spec, validate_spec, enable_spec);
+    tbl_def.setPrimaryKey(pk);
+    RESULT = tbl_def;
+  :}
+  // PRIMARY KEY(..) DISABLE NOVALIDATE RELY FOREIGN KEY (...) REFERENCES parent_table(..)
+  | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA
+    primary_keys:primary_keys enable_spec:enable_spec validate_spec:validate_spec
+    rely_spec:rely_spec COMMA foreign_keys_list:foreign_keys_list RPAREN
+  {:
+    tbl_def.getColumnDefs().addAll(list);
+    TableDef.PrimaryKey pk = new TableDef.PrimaryKey(tbl_def.getTblName(),
+      primary_keys, null, rely_spec, validate_spec, enable_spec);
+    tbl_def.setPrimaryKey(pk);
+    tbl_def.getForeignKeysList().addAll(foreign_keys_list);
+    RESULT = tbl_def;
+  :}
+  // FOREIGN KEYS followed by PRIMARY KEYS
+  | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA
+    foreign_keys_list:foreign_keys_list COMMA primary_keys:primary_keys
+    enable_spec:enable_spec validate_spec:validate_spec rely_spec:rely_spec RPAREN
+  {:
+    tbl_def.getColumnDefs().addAll(list);
+    TableDef.PrimaryKey pk = new TableDef.PrimaryKey(tbl_def.getTblName(),
+        primary_keys, null, rely_spec, validate_spec, enable_spec);
+    tbl_def.setPrimaryKey(pk);
+    tbl_def.getForeignKeysList().addAll(foreign_keys_list);
+    RESULT = tbl_def;
+  :}
+  | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA
+    foreign_keys_list:foreign_keys_list RPAREN
+  {:
+      tbl_def.getColumnDefs().addAll(list);
+      tbl_def.getForeignKeysList().addAll(foreign_keys_list);
+      RESULT = tbl_def;
+  :}
+  ;
+
+foreign_keys_list ::=
+  KW_FOREIGN key_ident LPAREN ident_list:fk_col_names RPAREN KW_REFERENCES
+      table_name:parent_tbl_name LPAREN ident_list:pk_col_names RPAREN
+      enable_spec:fk_enable_spec validate_spec:fk_validate_spec rely_spec:fk_rely_spec
+  {:
+    List<TableDef.ForeignKey> fk_list = new ArrayList<TableDef.ForeignKey>();
+    fk_list.add(new TableDef.ForeignKey(parent_tbl_name, pk_col_names, fk_col_names, null,
+        fk_rely_spec, fk_validate_spec, fk_enable_spec));
+    RESULT = fk_list;
+  :}
+  | foreign_keys_list:fk_list COMMA KW_FOREIGN key_ident LPAREN ident_list:fk_col_names
+      RPAREN KW_REFERENCES table_name:parent_tbl_name LPAREN ident_list:pk_col_names
+      RPAREN enable_spec:fk_enable_spec validate_spec:fk_validate_spec
+      rely_spec:fk_rely_spec
+  {:
+    fk_list.add(new TableDef.ForeignKey(parent_tbl_name, pk_col_names, fk_col_names, null,
+        fk_rely_spec, fk_validate_spec, fk_enable_spec));
+    RESULT = fk_list;
+  :}
   ;
 
 primary_keys ::=
@@ -1442,6 +1514,33 @@
   {: RESULT = col_names; :}
   ;
 
+rely_spec ::=
+  KW_RELY
+  {: RESULT = true; :}
+  | KW_NORELY
+  {: RESULT = false; :}
+  | // Empty
+  {: RESULT = false; :}
+  ;
+
+validate_spec ::=
+  KW_VALIDATE
+  {: RESULT = true; :}
+  | KW_NOVALIDATE
+  {: RESULT = false; :}
+  | //Empty
+  {: RESULT = false; :}
+  ;
+
+enable_spec ::=
+  KW_ENABLE
+  {: RESULT = true; :}
+  | KW_DISABLE
+  {: RESULT = false; :}
+  |  //Empty
+  {: RESULT = false; :}
+  ;
+
 tbl_options ::=
   opt_sort_cols:sort_cols opt_comment_val:comment opt_row_format_val:row_format
   serde_properties:serde_props file_format_create_table_val:file_format
@@ -3581,6 +3680,8 @@
   {: RESULT = r.toString(); :}
   | KW_COMPUTE:r
   {: RESULT = r.toString(); :}
+  | KW_CONSTRAINT:r
+  {: RESULT = r.toString(); :}
   | KW_COPY:r
   {: RESULT = r.toString(); :}
   | KW_CREATE:r
@@ -3611,6 +3712,8 @@
   {: RESULT = r.toString(); :}
   | KW_DESCRIBE:r
   {: RESULT = r.toString(); :}
+  | KW_DISABLE:r
+  {: RESULT = r.toString(); :}
   | KW_DISTINCT:r
   {: RESULT = r.toString(); :}
   | KW_DIV:r
@@ -3621,6 +3724,8 @@
   {: RESULT = r.toString(); :}
   | KW_ELSE:r
   {: RESULT = r.toString(); :}
+  | KW_ENABLE:r
+  {: RESULT = r.toString(); :}
   | KW_ENCODING:r
   {: RESULT = r.toString(); :}
   | KW_END:r
@@ -3653,6 +3758,8 @@
   {: RESULT = r.toString(); :}
   | KW_FOR:r
   {: RESULT = r.toString(); :}
+  | KW_FOREIGN:r
+  {: RESULT = r.toString(); :}
   | KW_FORMAT:r
   {: RESULT = r.toString(); :}
   | KW_FORMATTED:r
@@ -3731,8 +3838,12 @@
   {: RESULT = r.toString(); :}
   | KW_METADATA:r
   {: RESULT = r.toString(); :}
+  | KW_NORELY:r
+  {: RESULT = r.toString(); :}
   | KW_NOT:r
   {: RESULT = r.toString(); :}
+  | KW_NOVALIDATE:r
+  {: RESULT = r.toString(); :}
   | KW_NULL:r
   {: RESULT = r.toString(); :}
   | KW_NULLS:r
@@ -3779,10 +3890,14 @@
   {: RESULT = r.toString(); :}
   | KW_RECOVER:r
   {: RESULT = r.toString(); :}
+  | KW_REFERENCES:r
+  {: RESULT = r.toString(); :}
   | KW_REFRESH:r
   {: RESULT = r.toString(); :}
   | KW_REGEXP:r
   {: RESULT = r.toString(); :}
+  | KW_RELY:r
+  {: RESULT = r.toString(); :}
   | KW_RENAME:r
   {: RESULT = r.toString(); :}
   | KW_REPEATABLE:r
@@ -3885,6 +4000,8 @@
   {: RESULT = r.toString(); :}
   | KW_USING:r
   {: RESULT = r.toString(); :}
+  | KW_VALIDATE:r
+  {: RESULT = r.toString(); :}
   | KW_VALUES:r
   {: RESULT = r.toString(); :}
   | KW_VARCHAR:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
index 10b34af..5053572 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -53,7 +53,7 @@
         schemaLocation_.toString());
     String s = ToSqlUtils.getCreateTableSql(getDb(),
         getTbl() + " __LIKE_FILEFORMAT__ ",  getComment(), colsSql, partitionColsSql,
-        null, null, new Pair<>(getSortColumns(), getSortingOrder()),
+        null, null, null, new Pair<>(getSortColumns(), getSortingOrder()),
         getTblProperties(), getSerdeProperties(), isExternal(), getIfNotExists(),
         getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()), compression, null,
         getLocation());
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index be21bd4..7e4d33d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -23,6 +23,8 @@
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
@@ -41,6 +43,7 @@
 import org.apache.impala.util.MetaStoreUtil;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
@@ -103,6 +106,8 @@
   public List<ColumnDef> getPrimaryKeyColumnDefs() {
     return tableDef_.getPrimaryKeyColumnDefs();
   }
+  public List<SQLPrimaryKey> getPrimaryKeys() { return tableDef_.getSqlPrimaryKeys(); }
+  public List<SQLForeignKey> getForeignKeys() { return tableDef_.getSqlForeignKeys(); }
   public boolean isExternal() { return tableDef_.isExternal(); }
   public List<ColumnDef> getPartitionColumnDefs() {
     return tableDef_.getPartitionColumnDefs();
@@ -134,6 +139,27 @@
   }
 
   /**
+   * Get foreign keys information as strings. Useful for toSqlUtils.
+   * @return List of strings of the form "(col1, col2,..) REFERENCES [pk_db].pk_table
+   * (colA, colB,..)".
+   */
+  List<String> getForeignKeysSql() {
+    List<TableDef.ForeignKey> fkList = tableDef_.getForeignKeysList();
+    List<String> foreignKeysSql = new ArrayList<>();
+    if (fkList != null && !fkList.isEmpty()) {
+      for (TableDef.ForeignKey fk : fkList) {
+        StringBuilder sb = new StringBuilder("(");
+        Joiner.on(", ").appendTo(sb, fk.getForeignKeyColNames()).append(")");
+        sb.append(" REFERENCES ");
+        sb.append(fk.getFullyQualifiedPkTableName() + "(");
+        Joiner.on(", ").appendTo(sb, fk.getPrimaryKeyColNames()).append(")");
+        foreignKeysSql.add(sb.toString());
+      }
+    }
+    return foreignKeysSql;
+  }
+
+  /**
    * Can only be called after analysis, returns the owner of this table (the user from
    * the current session).
    */
@@ -184,6 +210,12 @@
     for (ColumnDef pkColDef: getPrimaryKeyColumnDefs()) {
       params.addToPrimary_key_column_names(pkColDef.getColName());
     }
+    for(SQLPrimaryKey pk: getPrimaryKeys()){
+      params.addToPrimary_keys(pk);
+    }
+    for(SQLForeignKey fk: getForeignKeys()){
+      params.addToForeign_keys(fk);
+    }
     params.setServer_name(serverName_);
     return params;
   }
@@ -191,6 +223,11 @@
   @Override
   public void collectTableRefs(List<TableRef> tblRefs) {
     tblRefs.add(new TableRef(tableDef_.getTblName().toPath(), null));
+    // When foreign keys are specified, we need to add all the tables the foreign keys are
+    // referring to.
+    for(TableDef.ForeignKey fk: tableDef_.getForeignKeysList()){
+      tblRefs.add(new TableRef(fk.getPkTableName().toPath(), null));
+    }
   }
 
   @Override
@@ -245,9 +282,6 @@
       }
       AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(),
           "Only Kudu tables can use the PARTITION BY clause.");
-      if (hasPrimaryKey()) {
-        throw new AnalysisException("Only Kudu tables can specify a PRIMARY KEY.");
-      }
       return;
     }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index a533490..0bff247 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -25,14 +25,18 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -92,6 +96,18 @@
   // Authoritative list of primary key column definitions populated during analysis.
   private final List<ColumnDef> primaryKeyColDefs_ = new ArrayList<>();
 
+  // Hive primary keys and foreign keys structures populated during analysis.
+  List<SQLPrimaryKey> sqlPrimaryKeys_ = new ArrayList<>();
+  List<SQLForeignKey> sqlForeignKeys_ = new ArrayList<>();
+
+  public List<SQLPrimaryKey> getSqlPrimaryKeys() {
+    return sqlPrimaryKeys_;
+  }
+
+  public List<SQLForeignKey> getSqlForeignKeys() {
+    return sqlForeignKeys_;
+  }
+
   // True if analyze() has been called.
   private boolean isAnalyzed_ = false;
 
@@ -165,6 +181,142 @@
 
   private Options options_;
 
+  /**
+   * Primary Key attributes grouped together to be populated by the parser.
+   * Currently only defined for HDFS tables.
+   */
+  static class PrimaryKey {
+
+    // Primary key table name
+    final TableName pkTableName;
+
+    // Primary Key columns
+    final List<String> primaryKeyColNames;
+
+    // Primary Key constraint name
+    final String pkConstraintName;
+
+    // Constraints
+    final boolean relyCstr;
+    final boolean validateCstr;
+    final boolean enableCstr;
+
+
+    public PrimaryKey(TableName pkTableName, List<String> primaryKeyColNames,
+                      String pkConstraintName, boolean relyCstr,
+                      boolean validateCstr, boolean enableCstr) {
+      this.pkTableName = pkTableName;
+      this.primaryKeyColNames = primaryKeyColNames;
+      this.pkConstraintName = pkConstraintName;
+      this.relyCstr = relyCstr;
+      this.validateCstr = validateCstr;
+      this.enableCstr = enableCstr;
+    }
+
+    public TableName getPkTableName() {
+      return pkTableName;
+    }
+
+    public List<String> getPrimaryKeyColNames() {
+      return primaryKeyColNames;
+    }
+
+    public String getPkConstraintName() {
+      return pkConstraintName;
+    }
+
+    public boolean isRelyCstr() {
+      return relyCstr;
+    }
+
+    public boolean isValidateCstr() {
+      return validateCstr;
+    }
+
+    public boolean isEnableCstr() {
+      return enableCstr;
+    }
+  }
+
+
+  /**
+   * Foreign Key attributes grouped together to be populated by the parser.
+   * Currently only defined for HDFS tables. An FK definition is of the form
+   * "foreign key(col1, col2) references pk_tbl(col3, col4)"
+   */
+  static class ForeignKey {
+    // Primary key table
+    final TableName pkTableName;
+
+    // Primary key cols
+    final List<String> primaryKeyColNames;
+
+    // Foreign key cols
+    final List<String> foreignKeyColNames;
+
+    // Name of fk
+    final String fkConstraintName;
+
+    // Fully qualified pk name. Set during analysis.
+    TableName fullyQualifiedPkTableName;
+
+    // Constraints
+    final boolean relyCstr;
+    final boolean validateCstr;
+    final boolean enableCstr;
+
+    ForeignKey(TableName pkTableName, List<String> primaryKeyColNames,
+               List<String> foreignKeyColNames, String fkName, boolean relyCstr,
+               boolean validateCstr, boolean enableCstr) {
+      this.pkTableName = pkTableName;
+      this.primaryKeyColNames = primaryKeyColNames;
+      this.foreignKeyColNames = foreignKeyColNames;
+      this.relyCstr = relyCstr;
+      this.validateCstr = validateCstr;
+      this.enableCstr = enableCstr;
+      this.fkConstraintName = fkName;
+    }
+
+    public TableName getPkTableName() {
+      return pkTableName;
+    }
+
+    public List<String> getPrimaryKeyColNames() {
+      return primaryKeyColNames;
+    }
+
+    public List<String> getForeignKeyColNames() {
+      return foreignKeyColNames;
+    }
+
+    public String getFkConstraintName() {
+      return fkConstraintName;
+    }
+
+    public TableName getFullyQualifiedPkTableName() {
+      return fullyQualifiedPkTableName;
+    }
+
+    public boolean isRelyCstr() {
+      return relyCstr;
+    }
+
+    public boolean isValidateCstr() {
+      return validateCstr;
+    }
+
+    public boolean isEnableCstr() {
+      return enableCstr;
+    }
+  }
+
+  // A TableDef will have only one primary key.
+  private PrimaryKey primaryKey_;
+
+  // There maybe multiple foreign keys for a TableDef forming multiple PK-FK
+  // relationships.
+  private List<ForeignKey> foreignKeysList_ = new ArrayList<>();
+
   // Result of analysis.
   private TableName fqTableName_;
 
@@ -194,6 +346,9 @@
     return columnDefs_.stream().map(col -> col.getType()).collect(Collectors.toList());
   }
 
+  public void setPrimaryKey(TableDef.PrimaryKey primaryKey_) {
+    this.primaryKey_ = primaryKey_;
+  }
   List<String> getPartitionColumnNames() {
     return ColumnDef.toColumnNames(getPartitionColumnDefs());
   }
@@ -228,6 +383,7 @@
   THdfsFileFormat getFileFormat() { return options_.fileFormat; }
   RowFormat getRowFormat() { return options_.rowFormat; }
   TSortingOrder getSortingOrder() { return options_.sortingOrder; }
+  List<ForeignKey> getForeignKeysList() { return foreignKeysList_; }
 
   /**
    * Analyzes the parameters of a CREATE TABLE statement.
@@ -240,6 +396,7 @@
     analyzeAcidProperties(analyzer);
     analyzeColumnDefs(analyzer);
     analyzePrimaryKeys();
+    analyzeForeignKeys(analyzer);
 
     if (analyzer.dbContainsTable(getTblName().getDb(), getTbl(), Privilege.CREATE)
         && !getIfNotExists()) {
@@ -297,13 +454,22 @@
           "Composite primary keys can be specified using the " +
           "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition.");
     }
-    if (primaryKeyColNames_.isEmpty()) return;
+
+    if (primaryKeyColNames_.isEmpty()) {
+      if (primaryKey_ == null || primaryKey_.getPrimaryKeyColNames().isEmpty()) {
+        return;
+      } else {
+        primaryKeyColNames_.addAll(primaryKey_.getPrimaryKeyColNames());
+      }
+    }
+
     if (!primaryKeyColDefs_.isEmpty()) {
       throw new AnalysisException("Multiple primary keys specified. " +
           "Composite primary keys can be specified using the " +
           "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition.");
     }
     Map<String, ColumnDef> colDefsByColName = ColumnDef.mapByColumnNames(columnDefs_);
+    int keySeq = 1;
     for (String colName: primaryKeyColNames_) {
       colName = colName.toLowerCase();
       ColumnDef colDef = colDefsByColName.remove(colName);
@@ -319,10 +485,111 @@
         throw new AnalysisException("Primary key columns cannot be nullable: " +
             colDef.toString());
       }
+      // HDFS Table specific analysis.
+      if (primaryKey_ != null) {
+        // We do not support enable and validate for primary keys.
+        if (primaryKey_.isEnableCstr()) {
+          throw new AnalysisException("ENABLE feature is not supported yet.");
+        }
+        if (primaryKey_.isValidateCstr()) {
+          throw new AnalysisException("VALIDATE feature is not supported yet.");
+        }
+        String constraintName = generateConstraintName();
+        // Each column of a primary key definition will be an SQLPrimaryKey.
+        sqlPrimaryKeys_.add(new SQLPrimaryKey(getTblName().getDb(), getTbl(),
+            colDef.getColName(), keySeq++, constraintName, primaryKey_.enableCstr,
+            primaryKey_.validateCstr, primaryKey_.relyCstr));
+      }
       primaryKeyColDefs_.add(colDef);
     }
   }
 
+  private void analyzeForeignKeys(Analyzer analyzer) throws AnalysisException {
+    if (foreignKeysList_ == null || foreignKeysList_.size() == 0) return;
+    for (ForeignKey fk: foreignKeysList_) {
+      // Foreign Key and Primary Key columns don't match.
+      if (fk.getForeignKeyColNames().size() != fk.getPrimaryKeyColNames().size()){
+        throw new AnalysisException("The number of foreign key columns should be same" +
+            " as the number of parent key columns.");
+      }
+      String parentDb = fk.getPkTableName().getDb();
+      if (parentDb == null) {
+        parentDb = analyzer.getDefaultDb();
+      }
+      fk.fullyQualifiedPkTableName = new TableName(parentDb, fk.pkTableName.getTbl());
+      //Check if parent table exits
+      if (!analyzer.dbContainsTable(parentDb, fk.getPkTableName().getTbl(),
+          Privilege.VIEW_METADATA)) {
+        throw new AnalysisException("Parent table not found: "
+            + analyzer.getFqTableName(fk.getPkTableName()));
+      }
+
+      //Check for primary key cols in parent table
+      FeTable parentTable = analyzer.getTable(fk.getPkTableName(),
+          Privilege.VIEW_METADATA);
+
+      if (!(parentTable instanceof FeFsTable)) {
+        throw new AnalysisException("Foreign keys on non-HDFS parent tables are not "
+            + "supported.");
+      }
+
+      for (String pkCol : fk.getPrimaryKeyColNames()) {
+        // TODO: Check column types of parent table and child tables match. Currently HMS
+        //  API fails if they don't, it's good to fail early during analysis here.
+        if (!parentTable.getColumnNames().contains(pkCol.toLowerCase())) {
+          throw new AnalysisException("Parent column not found: " + pkCol.toLowerCase());
+        }
+        // Hive has a bug that prevents foreign keys from being added when pk column is
+        // not part of primary key. This can be confusing. Till this bug is fixed, we
+        // will not allow foreign keys definition on such columns.
+        if (!((HdfsTable) parentTable).getPrimaryKeysSql().contains(pkCol)) {
+          throw new AnalysisException(String.format("Parent column %s is not part of "
+              + "primary key.", pkCol));
+        }
+      }
+
+      // We do not support ENABLE and VALIDATE.
+      if (fk.isEnableCstr()) {
+        throw new AnalysisException("ENABLE feature is not supported yet.");
+      }
+
+      if (fk.isValidateCstr()) {
+        throw new AnalysisException("VALIDATE feature is not supported yet.");
+      }
+
+      String constraintName = null;
+      for (int i = 0; i < fk.getForeignKeyColNames().size(); i++) {
+        if (fk.getFkConstraintName() == null) {
+          if (i == 0){
+            constraintName = generateConstraintName();
+          }
+        } else {
+          constraintName = fk.getFkConstraintName();
+        }
+        SQLForeignKey sqlForeignKey = new SQLForeignKey();
+        sqlForeignKey.setPktable_db(parentDb);
+        sqlForeignKey.setPktable_name(fk.getPkTableName().getTbl());
+        sqlForeignKey.setFktable_db(getTblName().getDb());
+        sqlForeignKey.setFktable_name(getTbl());
+        sqlForeignKey.setPkcolumn_name(fk.getPrimaryKeyColNames().get(i).toLowerCase());
+        sqlForeignKey.setFk_name(constraintName);
+        sqlForeignKey.setKey_seq(i+1);
+        sqlForeignKey.setFkcolumn_name(fk.getForeignKeyColNames().get(i).toLowerCase());
+        sqlForeignKey.setRely_cstr(fk.isRelyCstr());
+        getSqlForeignKeys().add(sqlForeignKey);
+      }
+    }
+  }
+
+  /**
+   * Utility method to generate a unique constraint name when user does not specify one.
+   * TODO: Collisions possible? HMS doesn't have an API to query existing constraint
+   * names.
+   */
+  private String generateConstraintName() {
+    return UUID.randomUUID().toString();
+  }
+
   /**
    * Analyzes the list of columns in 'sortCols' against the columns of 'table' and
    * returns their matching positions in the table's columns. Each column of 'sortCols'
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 54db034..2df6eb8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -26,6 +26,7 @@
 
 import org.antlr.runtime.ANTLRStringStream;
 import org.antlr.runtime.Token;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -41,6 +42,7 @@
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
@@ -252,9 +254,9 @@
     String kuduParamsSql = getKuduPartitionByParams(stmt);
     // TODO: Pass the correct compression, if applicable.
     return getCreateTableSql(stmt.getDb(), stmt.getTbl(), stmt.getComment(), colsSql,
-        partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), kuduParamsSql,
-        new Pair<>(stmt.getSortColumns(), stmt.getSortingOrder()), properties,
-        stmt.getSerdeProperties(), stmt.isExternal(), stmt.getIfNotExists(),
+        partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), stmt.getForeignKeysSql(),
+        kuduParamsSql, new Pair<>(stmt.getSortColumns(), stmt.getSortingOrder()),
+        properties, stmt.getSerdeProperties(), stmt.isExternal(), stmt.getIfNotExists(),
         stmt.getRowFormat(), HdfsFileFormat.fromThrift(stmt.getFileFormat()),
         HdfsCompression.NONE, null, stmt.getLocation());
   }
@@ -287,10 +289,10 @@
     // TODO: Pass the correct compression, if applicable.
     String createTableSql = getCreateTableSql(innerStmt.getDb(), innerStmt.getTbl(),
         innerStmt.getComment(), null, partitionColsSql,
-        innerStmt.getTblPrimaryKeyColumnNames(), kuduParamsSql,
-        new Pair<>(innerStmt.getSortColumns(), innerStmt.getSortingOrder()),
-        properties, innerStmt.getSerdeProperties(), innerStmt.isExternal(),
-        innerStmt.getIfNotExists(), innerStmt.getRowFormat(),
+        innerStmt.getTblPrimaryKeyColumnNames(), innerStmt.getForeignKeysSql(),
+        kuduParamsSql, new Pair<>(innerStmt.getSortColumns(),
+        innerStmt.getSortingOrder()), properties, innerStmt.getSerdeProperties(),
+        innerStmt.isExternal(), innerStmt.getIfNotExists(), innerStmt.getRowFormat(),
         HdfsFileFormat.fromThrift(innerStmt.getFileFormat()), HdfsCompression.NONE, null,
         innerStmt.getLocation());
     return createTableSql + " AS " + stmt.getQueryStmt().toSql(options);
@@ -333,6 +335,7 @@
 
     String storageHandlerClassName = table.getStorageHandlerClassName();
     List<String> primaryKeySql = new ArrayList<>();
+    List<String> foreignKeySql = new ArrayList<>();
     String kuduPartitionByParams = null;
     if (table instanceof FeKuduTable) {
       FeKuduTable kuduTable = (FeKuduTable) table;
@@ -370,13 +373,17 @@
       String inputFormat = msTable.getSd().getInputFormat();
       format = HdfsFileFormat.fromHdfsInputFormatClass(inputFormat);
       compression = HdfsCompression.fromHdfsInputFormatClass(inputFormat);
+      if (table instanceof HdfsTable) {
+        primaryKeySql = ((HdfsTable) table).getPrimaryKeysSql();
+        foreignKeySql = ((HdfsTable) table).getForeignKeysSql();
+      }
     }
     HdfsUri tableLocation = location == null ? null : new HdfsUri(location);
     return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql,
-        partitionColsSql, primaryKeySql, kuduPartitionByParams,
+        partitionColsSql, primaryKeySql, foreignKeySql, kuduPartitionByParams,
         new Pair<>(sortColsSql, sortingOrder), properties, serdeParameters,
-        isExternal, false, rowFormat, format, compression, storageHandlerClassName,
-        tableLocation);
+        isExternal, false, rowFormat, format, compression,
+        storageHandlerClassName, tableLocation);
   }
 
   /**
@@ -386,8 +393,8 @@
    */
   public static String getCreateTableSql(String dbName, String tableName,
       String tableComment, List<String> columnsSql, List<String> partitionColumnsSql,
-      List<String> primaryKeysSql, String kuduPartitionByParams,
-      Pair<List<String>, TSortingOrder> sortProperties,
+      List<String> primaryKeysSql, List<String> foreignKeysSql,
+      String kuduPartitionByParams, Pair<List<String>, TSortingOrder> sortProperties,
       Map<String, String> tblProperties, Map<String, String> serdeParameters,
       boolean isExternal, boolean ifNotExists, RowFormat rowFormat,
       HdfsFileFormat fileFormat, HdfsCompression compression,
@@ -402,10 +409,14 @@
     if (columnsSql != null && !columnsSql.isEmpty()) {
       sb.append(" (\n  ");
       sb.append(Joiner.on(",\n  ").join(columnsSql));
-      if (primaryKeysSql != null && !primaryKeysSql.isEmpty()) {
+      if (CollectionUtils.isNotEmpty(primaryKeysSql)) {
         sb.append(",\n  PRIMARY KEY (");
         Joiner.on(", ").appendTo(sb, primaryKeysSql).append(")");
       }
+      if (CollectionUtils.isNotEmpty(foreignKeysSql)) {
+        sb.append(",\n  FOREIGN KEY");
+        Joiner.on(",\n  FOREIGN KEY").appendTo(sb, foreignKeysSql).append("\n");
+      }
       sb.append("\n)");
     } else {
       // CTAS for Kudu tables still print the primary key
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 8a13700..076c84a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -28,6 +28,8 @@
 import java.util.TreeMap;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.PartitionKeyValue;
@@ -180,6 +182,16 @@
   List<? extends FeFsPartition> loadPartitions(Collection<Long> ids);
 
   /**
+   * @return: Primary keys information.
+   */
+  List<SQLPrimaryKey> getPrimaryKeys();
+
+  /**
+   * @return Foreign keys information.
+   */
+  List<SQLForeignKey> getForeignKeys();
+
+  /**
    * Parses and returns the value of the 'skip.header.line.count' table property. If the
    * value is not set for the table, returns 0. If parsing fails or a value < 0 is found,
    * the error parameter is updated to contain an error message.
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index ff0f1b7..2b8e0d8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -44,7 +44,11 @@
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.impala.analysis.Expr;
@@ -229,6 +233,10 @@
   // for setAvroSchema().
   private boolean isSchemaLoaded_ = false;
 
+  // Primary Key and Foreign Key information. Set in load() method.
+  private final List<SQLPrimaryKey> primaryKeys_ = new ArrayList<>();
+  private final List<SQLForeignKey> foreignKeys_ = new ArrayList<>();
+
   // Represents a set of storage-related statistics aggregated at the table or partition
   // level.
   public final static class FileMetadataStats {
@@ -963,13 +971,13 @@
       try {
         if (loadTableSchema) {
             // set nullPartitionKeyValue from the hive conf.
-            nullPartitionKeyValue_ =
-                MetaStoreUtil.getNullPartitionKeyValue(client).intern();
-            loadSchema(msTbl);
-            loadAllColumnStats(client);
+          nullPartitionKeyValue_ =
+            MetaStoreUtil.getNullPartitionKeyValue(client).intern();
+          loadSchema(msTbl);
+          loadAllColumnStats(client);
+          loadPkFkInfo(client, msTbl);
         }
         loadValidWriteIdList(client);
-
         // Load partition and file metadata
         if (reuseMetadata) {
           // Incrementally update this table's partitions and file metadata
@@ -1016,6 +1024,26 @@
   }
 
   /**
+   * Load Primary Key and Foreign Key information for table. Throws TableLoadingException
+   * if the load fails.
+   */
+  private void loadPkFkInfo(IMetaStoreClient client,
+      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException{
+    try {
+      // Reset and add primary keys info and foreign keys info.
+      primaryKeys_.clear();
+      foreignKeys_.clear();
+      primaryKeys_.addAll(client.getPrimaryKeys(
+          new PrimaryKeysRequest(msTbl.getDbName(), msTbl.getTableName())));
+      foreignKeys_.addAll(client.getForeignKeys(new ForeignKeysRequest(null, null,
+          msTbl.getDbName(), msTbl.getTableName())));
+    } catch (Exception e) {
+      throw new TableLoadingException("Failed to load primary keys/foreign keys for "
+          + "table: " + getFullName(), e);
+    }
+  }
+
+  /**
    * Updates the table metadata, including 'hdfsBaseDir_', 'isMarkedCached_',
    * and 'accessLevel_' from 'msTbl'. Returns time spent accessing file system
    * in nanoseconds. Throws an IOException if there was an error accessing
@@ -1400,6 +1428,10 @@
     nullColumnValue_ = hdfsTable.nullColumnValue;
     nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue;
     hostIndex_.populate(hdfsTable.getNetwork_addresses());
+    primaryKeys_.clear();
+    primaryKeys_.addAll(hdfsTable.getPrimary_keys());
+    foreignKeys_.clear();
+    foreignKeys_.addAll(hdfsTable.getForeign_keys());
     resetPartitions();
     try {
       for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) {
@@ -1560,6 +1592,8 @@
     THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
         nullPartitionKeyValue_, nullColumnValue_, idToPartition, prototypePartition);
     hdfsTable.setAvroSchema(avroSchema_);
+    hdfsTable.setPrimary_keys(primaryKeys_);
+    hdfsTable.setForeign_keys(foreignKeys_);
     if (type == ThriftObjectType.FULL) {
       // Network addresses are used only by THdfsFileBlocks which are inside
       // THdfsFileDesc, so include network addreses only when including THdfsFileDesc.
@@ -1586,6 +1620,61 @@
   @Override // FeFsTable
   public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
 
+  @Override
+  public List<SQLPrimaryKey> getPrimaryKeys() { return primaryKeys_; }
+
+  @Override
+  public List<SQLForeignKey> getForeignKeys() { return foreignKeys_; }
+
+  /**
+   * Get primary keys column names, useful for toSqlUtils.
+   */
+  public List<String> getPrimaryKeysSql() {
+    List<String> primaryKeyColNames = new ArrayList<>();
+    if (getPrimaryKeys() != null && !getPrimaryKeys().isEmpty()) {
+      getPrimaryKeys().stream().forEach(p -> primaryKeyColNames.add(p.getColumn_name()));
+    }
+    return primaryKeyColNames;
+  }
+
+  /**
+   * Get foreign keys information as strings. Useful for toSqlUtils.
+   * @return List of strings of the form "(col1, col2,..) REFERENCES [pk_db].pk_table
+   * (colA, colB,..)".
+   */
+  public List<String> getForeignKeysSql() {
+    List<String> foreignKeysSql = new ArrayList<>();
+    // Iterate through foreign keys list. This list may contain multiple foreign keys
+    // and each foreign key may contain multiple columns. The outerloop collects
+    // information common to a foreign key (pk table information). The inner
+    // loop collects column information.
+    List<SQLForeignKey> foreignKeys = getForeignKeys();
+    for (int i = 0; i < foreignKeys.size(); i++) {
+      String pkTableDb = foreignKeys.get(i).getPktable_db();
+      String pkTableName = foreignKeys.get(i).getPktable_name();
+      List<String> pkList = new ArrayList<>();
+      List<String> fkList = new ArrayList<>();
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      for (; i<foreignKeys.size(); i++) {
+        fkList.add(foreignKeys.get(i).getFkcolumn_name());
+        pkList.add(foreignKeys.get(i).getPkcolumn_name());
+        // Bail out of inner loop if the key_seq of the next ForeignKey is 1.
+        if (i + 1 < foreignKeys.size() && foreignKeys.get(i + 1).getKey_seq() == 1) {
+          break;
+        }
+      }
+      Joiner.on(", ").appendTo(sb, fkList).append(") ");
+      sb.append("REFERENCES ");
+      if (pkTableDb != null) sb.append(pkTableDb + ".");
+      sb.append(pkTableName + "(");
+      Joiner.on(", ").appendTo(sb, pkList).append(")");
+      foreignKeysSql.add(sb.toString());
+    }
+    return foreignKeysSql;
+  }
+
+
   /**
    * Returns the set of file formats that the partitions are stored in.
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 19db140..a59ab00 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -31,6 +31,8 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -521,4 +523,14 @@
   public ListMap<TNetworkAddress> getHostIndex() {
     return hostIndex_;
   }
+
+  @Override
+  public List<SQLPrimaryKey> getPrimaryKeys() {
+    return null;
+  }
+
+  @Override
+  public List<SQLForeignKey> getForeignKeys() {
+    return null;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ce1c481..31c1a2f 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -54,6 +54,8 @@
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.io.IOUtils;
@@ -2120,7 +2122,7 @@
     Preconditions.checkState(params.getColumns().size() > 0,
         "Empty column list given as argument to Catalog.createTable");
     return createTable(tbl, params.if_not_exists, params.getCache_op(),
-        params.server_name, response);
+        params.server_name, params.getPrimary_keys(), params.getForeign_keys(), response);
   }
 
   /**
@@ -2273,11 +2275,19 @@
    */
   private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable,
       boolean if_not_exists, THdfsCachingOp cacheOp, String serverName,
+      List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
       TDdlExecResponse response) throws ImpalaException {
     Preconditions.checkState(!KuduTable.isKuduTable(newTable));
     synchronized (metastoreDdlLock_) {
       try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        msClient.getHiveClient().createTable(newTable);
+        if (primaryKeys == null && foreignKeys == null) {
+          msClient.getHiveClient().createTable(newTable);
+        } else {
+          MetastoreShim.createTableWithConstraints(
+              msClient.getHiveClient(), newTable,
+              primaryKeys == null ? new ArrayList<>() : primaryKeys,
+              foreignKeys == null ? new ArrayList<>() : foreignKeys);
+        }
         // TODO (HIVE-21807): Creating a table and retrieving the table information is
         // not atomic.
         addSummary(response, "Table has been created.");
@@ -2360,7 +2370,8 @@
         new org.apache.hadoop.hive.metastore.api.Table();
     setCreateViewAttributes(params, view);
     LOG.trace(String.format("Creating view %s", tableName));
-    if (!createTable(view, params.if_not_exists, null, params.server_name, response)) {
+    if (!createTable(view, params.if_not_exists, null, params.server_name,
+        new ArrayList<>(), new ArrayList<>(), response)) {
       addSummary(response, "View already exists.");
     } else {
       addSummary(response, "View has been created.");
@@ -2481,7 +2492,8 @@
     tbl.putToParameters(StatsSetupConst.ROW_COUNT, "-1");
     setDefaultTableCapabilities(tbl);
     LOG.trace(String.format("Creating table %s LIKE %s", tblName, srcTblName));
-    createTable(tbl, params.if_not_exists, null, params.server_name, response);
+    createTable(tbl, params.if_not_exists, null, params.server_name, null, null,
+        response);
   }
 
   private static void setDefaultTableCapabilities(
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 5d639ed..5b4d6a7 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -96,6 +96,7 @@
     keywordMap.put("comment", SqlParserSymbols.KW_COMMENT);
     keywordMap.put("compression", SqlParserSymbols.KW_COMPRESSION);
     keywordMap.put("compute", SqlParserSymbols.KW_COMPUTE);
+    keywordMap.put("constraint", SqlParserSymbols.KW_CONSTRAINT);
     keywordMap.put("copy", SqlParserSymbols.KW_COPY);
     keywordMap.put("create", SqlParserSymbols.KW_CREATE);
     keywordMap.put("cross", SqlParserSymbols.KW_CROSS);
@@ -111,11 +112,13 @@
     keywordMap.put("delimited", SqlParserSymbols.KW_DELIMITED);
     keywordMap.put("desc", SqlParserSymbols.KW_DESC);
     keywordMap.put("describe", SqlParserSymbols.KW_DESCRIBE);
+    keywordMap.put("disable", SqlParserSymbols.KW_DISABLE);
     keywordMap.put("distinct", SqlParserSymbols.KW_DISTINCT);
     keywordMap.put("div", SqlParserSymbols.KW_DIV);
     keywordMap.put("double", SqlParserSymbols.KW_DOUBLE);
     keywordMap.put("drop", SqlParserSymbols.KW_DROP);
     keywordMap.put("else", SqlParserSymbols.KW_ELSE);
+    keywordMap.put("enable", SqlParserSymbols.KW_ENABLE);
     keywordMap.put("encoding", SqlParserSymbols.KW_ENCODING);
     keywordMap.put("end", SqlParserSymbols.KW_END);
     keywordMap.put("escaped", SqlParserSymbols.KW_ESCAPED);
@@ -132,6 +135,7 @@
     keywordMap.put("float", SqlParserSymbols.KW_FLOAT);
     keywordMap.put("following", SqlParserSymbols.KW_FOLLOWING);
     keywordMap.put("for", SqlParserSymbols.KW_FOR);
+    keywordMap.put("foreign", SqlParserSymbols.KW_FOREIGN);
     keywordMap.put("format", SqlParserSymbols.KW_FORMAT);
     keywordMap.put("formatted", SqlParserSymbols.KW_FORMATTED);
     keywordMap.put("from", SqlParserSymbols.KW_FROM);
@@ -172,7 +176,9 @@
     keywordMap.put("map", SqlParserSymbols.KW_MAP);
     keywordMap.put("merge_fn", SqlParserSymbols.KW_MERGE_FN);
     keywordMap.put("metadata", SqlParserSymbols.KW_METADATA);
+    keywordMap.put("norely", SqlParserSymbols.KW_NORELY);
     keywordMap.put("not", SqlParserSymbols.KW_NOT);
+    keywordMap.put("novalidate", SqlParserSymbols.KW_NOVALIDATE);
     keywordMap.put("null", SqlParserSymbols.KW_NULL);
     keywordMap.put("nulls", SqlParserSymbols.KW_NULLS);
     keywordMap.put("offset", SqlParserSymbols.KW_OFFSET);
@@ -198,8 +204,10 @@
     keywordMap.put("rcfile", SqlParserSymbols.KW_RCFILE);
     keywordMap.put("real", SqlParserSymbols.KW_DOUBLE);
     keywordMap.put("recover", SqlParserSymbols.KW_RECOVER);
+    keywordMap.put("references", SqlParserSymbols.KW_REFERENCES);
     keywordMap.put("refresh", SqlParserSymbols.KW_REFRESH);
     keywordMap.put("regexp", SqlParserSymbols.KW_REGEXP);
+    keywordMap.put("rely", SqlParserSymbols.KW_RELY);
     keywordMap.put("rename", SqlParserSymbols.KW_RENAME);
     keywordMap.put("repeatable", SqlParserSymbols.KW_REPEATABLE);
     keywordMap.put("replace", SqlParserSymbols.KW_REPLACE);
@@ -251,6 +259,7 @@
     keywordMap.put("upsert", SqlParserSymbols.KW_UPSERT);
     keywordMap.put("use", SqlParserSymbols.KW_USE);
     keywordMap.put("using", SqlParserSymbols.KW_USING);
+    keywordMap.put("validate", SqlParserSymbols.KW_VALIDATE);
     keywordMap.put("values", SqlParserSymbols.KW_VALUES);
     keywordMap.put("varchar", SqlParserSymbols.KW_VARCHAR);
     keywordMap.put("view", SqlParserSymbols.KW_VIEW);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index f807bdc..7901dc5 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -2350,6 +2350,49 @@
     AnalyzesOk("create table functional.new_table (c char(250))");
     AnalyzesOk("create table new_table (i int) PARTITIONED BY (c char(3))");
 
+     // Primary key and foreign key specification.
+    AnalyzesOk("create table foo(id int, year int, primary key (id))");
+    AnalyzesOk("create table foo(id int, year int, primary key (id, year))");
+    AnalyzesOk("create table foo(id int, year int, primary key (id, year) disable "
+        + "novalidate rely)");
+    AnalysisError("create table foo(id int, year int, primary key (id, year) enable"
+        + " novalidate rely)", "ENABLE feature is not supported yet.");
+    AnalysisError("create table foo(id int, year int, primary key (id, year) disable"
+        + " validate rely)", "VALIDATE feature is not supported yet.");
+    AnalysisError("create table pk(id int, primary key(year))", "PRIMARY KEY column "
+        + "'year' does not exist in the table");
+
+    // Foreign key test needs a valid primary key table to pass.
+    addTestDb("test_pk_fk", "Test DB for PK/FK tests");
+    addTestTable("create table test_pk_fk.pk (id int, year string, primary key (id, "
+        + "year) disable novalidate rely)");
+    addTestTable("create table test_pk_fk.non_pk_table(id int)");
+    AnalysisContext ctx = createAnalysisCtx("test_pk_fk");
+    AnalysisError("create table foo(id int, year int, foreign key (id) references "
+        + "pk(id) enable novalidate rely)", ctx,"ENABLE feature is "
+        + "not supported yet.");
+    AnalysisError("create table foo(id int, year int, foreign key (id) references "
+        + "pk(id) disable validate rely)", ctx,"VALIDATE feature is "
+        + "not supported yet.");
+    AnalyzesOk("create table fk(id int, year int, primary key (id, year) disable "
+        + "novalidate rely, foreign key(id) REFERENCES pk(id) "
+        + "DISABLE NOVALIDATE RELY)", ctx);
+    AnalyzesOk("create table foo(id int, year int, foreign key (id) references "
+        + "pk(id) disable novalidate rely)", ctx);
+    AnalyzesOk("create table foo(id int, year int, foreign key (id) references "
+        + "pk(id))", ctx);
+    AnalysisError("create table fk(id int, year string, foreign key(year) references "
+        + "pk2(year))", ctx, "Parent table not found: test_pk_fk.pk2");
+    AnalyzesOk("create table fk(id int, year string, foreign key(id, year) references"
+        + " pk(id, year))", ctx);
+    AnalysisError("create table fk(id int, year string, foreign key(id, year) "
+        + "references pk(year))", ctx, "The number of foreign key columns should be same"
+        + " as the number of parent key columns.");
+    AnalysisError("create table fk(id int, foreign key(id) references pk(foo))", ctx,
+        "Parent column not found: foo");
+    AnalysisError("create table fk(id int, foreign key(id) references "
+        + "non_pk_table(id))", ctx, "Parent column id is not part of primary key.");
+
     {
       // Check that long_properties fail at the analysis layer
       String long_property_key = "";
@@ -2414,8 +2457,6 @@
       AnalysisError(String.format("create table t (i int primary key) stored as %s",
           format), String.format("Unsupported column options for file format " +
               "'%s': 'i INT PRIMARY KEY'", fileFormatsStr[formatIndx]));
-      AnalysisError(String.format("create table t (i int, primary key(i)) stored as %s",
-          format), "Only Kudu tables can specify a PRIMARY KEY");
       formatIndx++;
     }
 
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index a012538..b3c22a5 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -2685,6 +2685,34 @@
     ParserError("CREATE TABLE foo (PRIMARY KEY(a), a INT) STORED AS KUDU");
     ParserError("CREATE TABLE foo (i INT) PRIMARY KEY (i) STORED AS KUDU");
 
+    // Primary key and foreign key specification.
+    ParsesOk("create table foo(id int, year int, primary key (id))");
+    ParsesOk("create table foo(id int, year int, primary key (id, year))");
+    ParsesOk("create table foo(id int, year int, primary key (id, year) disable "
+        + "novalidate rely)");
+    ParsesOk("create table foo(id int, year int, primary key (id, year) "
+        + "novalidate rely)");
+    ParsesOk("create table foo(id int, year int, primary key (id, year) "
+        + "rely)");
+    ParserError("create table foo(id int, year string, primary key(id), primary key"
+        + "(year))");
+    ParsesOk("create table fk(id int, year int, primary key (id, year) disable "
+        + "novalidate rely, foreign key(id) REFERENCES pk(id) DISABLE NOVALIDATE RELY)");
+    ParsesOk("create table foo(id int, year int, foreign key (id) references "
+        + "pk(id))");
+    ParsesOk("create table fk(id int, year string, foreign key(year) references pk"
+        + "(year), primary key(id))");
+    ParsesOk("create table foo(id int, year int, primary key (id, year) enable "
+        + "novalidate rely)");
+    // Different order of constraints is not supported.
+    ParserError("create table foo(id int, year int, primary key (id, year) novalidate "
+        + "disable rely)");
+    ParserError("create table fk(id int, year int, foreign key(id) REFERENCES pk(id) "
+        + "NOVALIDATE DISABLE RELY)");
+    // Multiple foreign keys
+    ParsesOk("create table fk(id int, year string, primary key(id), foreign key(id) "
+        + "references pk(id), foreign key (year) references pk(year))");
+
     // Table Properties
     String[] tblPropTypes = {"TBLPROPERTIES", "WITH SERDEPROPERTIES"};
     for (String propType: tblPropTypes) {
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 8f7f0a5..94e3413 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -81,7 +81,7 @@
   }
 
   private void testToSql(AnalysisContext ctx, String query) {
-    testToSql(ctx, query, query);
+    testToSql(ctx, query, query, false);
   }
 
   private void testToSql(String query, String expected) {
@@ -93,8 +93,9 @@
     testToSql(createAnalysisCtx(defaultDb), query, defaultDb, expected, false, options);
   }
 
-  private void testToSql(AnalysisContext ctx, String query, String expected) {
-    testToSql(ctx, query, System.getProperty("user.name"), expected);
+  private void testToSql(AnalysisContext ctx, String query, String expected,
+      boolean ignoreWhiteSpace) {
+    testToSql(ctx, query, System.getProperty("user.name"), expected, ignoreWhiteSpace);
   }
 
   private void testToSql(String query, String defaultDb, String expected) {
@@ -102,8 +103,8 @@
   }
 
   private void testToSql(AnalysisContext ctx, String query, String defaultDb,
-      String expected) {
-    testToSql(ctx, query, defaultDb, expected, false, ToSqlOptions.DEFAULT);
+      String expected, boolean ignoreWhiteSpace) {
+    testToSql(ctx, query, defaultDb, expected, ignoreWhiteSpace, ToSqlOptions.DEFAULT);
   }
 
   private void testToSql(String query, String defaultDb, String expected,
@@ -365,6 +366,24 @@
         "'storage_handler'='org.apache.hadoop.hive.kudu.KuduStorageHandler')",
         kuduMasters),
         true);
+
+    // Test primary key and foreign key toSqls.
+    // TODO: Add support for displaying constraint information (DISABLE, NOVALIDATE, RELY)
+    testToSql("create table pk(id int, year string, primary key (id, year))", "default",
+        "CREATE TABLE default.pk ( id INT, year STRING, PRIMARY KEY (id, year) ) "
+            + "STORED AS TEXTFILE", true);
+
+    // Foreign Key test requires a valid primary key table.
+    addTestDb("test_pk_fk", "Test DB for PK/FK tests");
+    addTestTable("create table test_pk_fk.pk (id int, year string, primary key (id, "
+        + "year))");
+    AnalysisContext ctx = createAnalysisCtx("test_pk_fk");
+
+    testToSql(ctx, "create table fk(id int, year string, FOREIGN KEY (id) "
+        + "REFERENCES pk(id), FOREIGN KEY (year) REFERENCES pk"
+        + "(year))", "CREATE TABLE test_pk_fk.fk ( id INT, year STRING, "
+        + "FOREIGN KEY(id) REFERENCES test_pk_fk.pk(id), FOREIGN KEY(year) REFERENCES "
+        + "test_pk_fk.pk(year) ) STORED AS TEXTFILE", true);
   }
 
   @Test
@@ -1600,7 +1619,7 @@
           testToSql(ctx, String.format("GRANT %s ON SERVER server1 TO %s %s", p,
               pt, testRole));
           testToSql(ctx, String.format("GRANT %s ON SERVER TO %s", p, testRole),
-              String.format("GRANT %s ON SERVER server1 TO ROLE %s", p, testRole));
+              String.format("GRANT %s ON SERVER server1 TO ROLE %s", p, testRole), false);
           testToSql(ctx, String.format(
               "GRANT %s ON SERVER server1 TO %s %s WITH GRANT OPTION", p, pt,
               testRole));
@@ -1609,7 +1628,7 @@
           testToSql(ctx, String.format("REVOKE %s ON SERVER FROM %s %s", p, pt,
               testRole),
               String.format("REVOKE %s ON SERVER server1 FROM %s %s", p, pt,
-                  testRole));
+                  testRole), false);
           testToSql(ctx, String.format(
               "REVOKE GRANT OPTION FOR %s ON SERVER server1 FROM %s %s", p, pt,
               testRole));
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
index 606ea39..1b5f441 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
@@ -203,6 +203,7 @@
       try {
         HdfsTable hdfsTable = (HdfsTable) dummyTable;
         hdfsTable.initializePartitionMetadata(msTbl);
+        hdfsTable.getPrimaryKeys().addAll(createTableStmt.getPrimaryKeys());
       } catch (CatalogException e) {
         e.printStackTrace();
         fail("Failed to add test table:\n" + createTableSql);