IMPALA-9053: DDLs should generate lineage graphs.

DDLs like 'create table' should generate minimal lineage graphs so
that consumers like Atlas can use information like 'queryText' to
establish lineages.

This change adds a call to the computeLineageGraph() method during
analysis phase of createTable which populates the graph with basic
information like queryText. If it is a CTAS, this graph is enhanced
in the "insert" phase with dependencies.

Testing:
Add an EE test to verify lineage information and also to check it
is flushed to disk properly.

Change-Id: Ia6c7ed9fe3265fd777fe93590cf4eb2d9ba0dd1e
Reviewed-on: http://gerrit.cloudera.org:8080/14458
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
index 6285af6..ec401b3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
@@ -536,8 +536,11 @@
    */
   public void computeLineageGraph(List<Expr> resultExprs, Analyzer rootAnalyzer) {
     init(rootAnalyzer);
-    computeProjectionDependencies(resultExprs, rootAnalyzer);
-    computeResultPredicateDependencies(rootAnalyzer);
+    // Compute the dependencies only if result expressions are available.
+    if (resultExprs != null && !resultExprs.isEmpty()) {
+      computeProjectionDependencies(resultExprs, rootAnalyzer);
+      computeResultPredicateDependencies(rootAnalyzer);
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 0a1ea77..d768cfc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -28,6 +28,8 @@
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TSortingOrder;
@@ -214,6 +216,21 @@
       }
       AvroSchemaUtils.setFromSerdeComment(getColumnDefs());
     }
+
+    // If lineage logging is enabled, compute minimal lineage graph.
+    if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
+       computeLineageGraph(analyzer);
+    }
+  }
+
+  /**
+   * Computes a minimal column lineage graph for create statement. This will just
+   * populate a few fields of the graph including query text. If this is a CTAS,
+   * the graph is enhanced during the "insert" phase of CTAS.
+   */
+  protected void computeLineageGraph(Analyzer analyzer) {
+    ColumnLineageGraph graph = analyzer.getColumnLineageGraph();
+    graph.computeLineageGraph(new ArrayList(), analyzer);
   }
 
   /**
diff --git a/testdata/workloads/functional-query/queries/QueryTest/lineage.test b/testdata/workloads/functional-query/queries/QueryTest/lineage.test
index 2335d5a..8f24ad2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/lineage.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/lineage.test
@@ -17,6 +17,21 @@
 ---- QUERY
 create table lineage_test_db.alltypesinsert like functional.alltypesinsert
 ====
+---- LINEAGE
+{
+  "hash":"b0d53e4deafb2467c4108c17667653b5",
+  "timestamp":1571178583,
+  "vertices":[],
+  "edges":[],
+  "queryId":"524cc93f26a86671:e8455a9500000000",
+  "user":"anurag",
+  "queryText":"create table lineage_test_db.foo (id int)",
+  "endTime":1571178584
+}
+---- QUERY
+# Test lineage is created with queryText populated for DDLs.
+create table lineage_test_db.foo (id int)
+====
 ---- QUERY
 create view lineage_test_db.alltypes_view as select * from lineage_test_db.alltypes
 ====
diff --git a/tests/custom_cluster/test_lineage.py b/tests/custom_cluster/test_lineage.py
index 33b3e99..71e61e4 100644
--- a/tests/custom_cluster/test_lineage.py
+++ b/tests/custom_cluster/test_lineage.py
@@ -34,6 +34,7 @@
 class TestLineage(CustomClusterTestSuite):
   START_END_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="start_end_time")
   CREATE_TABLE_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="create_table_time")
+  DDL_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="ddl_lineage")
   LINEAGE_TESTS_DIR = tempfile.mkdtemp(prefix="test_lineage")
 
   @classmethod
@@ -107,6 +108,29 @@
               assert "{0}.lineage_test_tbl".format(unique_database) == table_name
               assert table_create_time != -1
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
+                                    .format(DDL_LINEAGE_LOG_DIR))
+  def test_ddl_lineage(self, unique_database):
+    """ Test that DDLs like 'create table' have query text populated in the lineage
+    graph."""
+    query = "create external table {0}.ddl_lineage_tbl (id int)".format(unique_database)
+    result = self.execute_query_expect_success(self.client, query)
+    profile_query_id = re.search("Query \(id=(.*)\):", result.runtime_profile).group(1)
+
+    # Wait to flush the lineage log files.
+    time.sleep(3)
+
+    for log_filename in os.listdir(self.DDL_LINEAGE_LOG_DIR):
+      log_path = os.path.join(self.DDL_LINEAGE_LOG_DIR, log_filename)
+      # Only the coordinator's log file will be populated.
+      if os.path.getsize(log_path) > 0:
+        with open(log_path) as log_file:
+          lineage_json = json.load(log_file)
+          assert lineage_json["queryId"] == profile_query_id
+          assert lineage_json["queryText"] is not None
+          assert lineage_json["queryText"] == query
+
   @SkipIfS3.hbase
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"