HIVE-28409: Column lineage when creating view is missing if atlas HiveHook is set (Krisztian Kasa, reviewed by Denys Kuzmenko, Zsolt Miskolczi)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d33d60e..af3a757 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3863,8 +3863,16 @@ public static enum ConfVars {
         "partition columns or non-partition columns while displaying columns in describe\n" +
         "table. From 0.12 onwards, they are displayed separately. This flag will let you\n" +
         "get old behavior, if desired. See, test-case in patch for HIVE-6689."),
+    @Deprecated
     HIVE_LINEAGE_INFO("hive.lineage.hook.info.enabled", false,
-        "Whether Hive provides lineage information to hooks."),
+        "Whether Hive provides lineage information to hooks." +
+            "Deprecated: use hive.lineage.statement.filter instead."),
+    HIVE_LINEAGE_STATEMENT_FILTER("hive.lineage.statement.filter", "ALL",
+        "Whether Hive provides lineage information to hooks for the specified statements only, " +
+            "the value is a comma-separated list (ex.: CREATE_MATERIALIZED_VIEW," +
+            "CREATE_TABLE,CREATE_TABLE_AS_SELECT). Possible values are: CREATE_TABLE, CREATE_TABLE_AS_SELECT, " +
+            "CREATE_VIEW, CREATE_MATERIALIZED_VIEW, LOAD, QUERY, ALL, NONE." +
+            " ALL means lineage information is always provided, NONE and empty string means never."),
 
     HIVE_SSL_PROTOCOL_BLACKLIST("hive.ssl.protocol.blacklist", "SSLv2,SSLv3",
         "SSL Versions to disable for all Hive Servers"),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
index d9d338c..eec7f54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
@@ -69,6 +69,7 @@ public class QueryProperties {
 
   // True if this statement creates or replaces a materialized view
   private boolean isMaterializedView;
+  private boolean isView;
 
   public boolean isQuery() {
     return query;
@@ -285,6 +286,14 @@ public void setMaterializedView(boolean isMaterializedView) {
     this.isMaterializedView = isMaterializedView;
   }
 
+  public boolean isView() {
+    return isView;
+  }
+
+  public void setView(boolean view) {
+    isView = view;
+  }
+
   public void clear() {
     query = false;
     analyzeCommand = false;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index 3341be8..c483168 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -79,7 +79,7 @@ public void initialize(HiveConf hiveConf) {
         || postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")
         || postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")
         || postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {
-      transformations.add(new Generator(postExecHooks));
+      transformations.add(Generator.fromConf(hiveConf));
     }
 
     // Try to transform OR predicates in Filter into simpler IN clauses first
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index a4e236b..f86e027 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -105,7 +105,8 @@ public class SimpleFetchOptimizer extends Transform {
   @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
     Map<String, TableScanOperator> topOps = pctx.getTopOps();
-    if (pctx.getQueryProperties().isQuery() && !pctx.getQueryProperties().isAnalyzeCommand()
+    if ((pctx.getQueryProperties().isQuery() || pctx.getQueryProperties().isView())
+        && !pctx.getQueryProperties().isAnalyzeCommand()
         && topOps.size() == 1) {
       // no join, no groupby, no distinct, no lateral view, no subq,
       // no CTAS or insert, not analyze command, and single sourced.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
index 3a4522f..20889eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
@@ -19,10 +19,15 @@
 package org.apache.hadoop.hive.ql.optimizer.lineage;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 
+import org.apache.commons.lang3.EnumUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
@@ -59,11 +64,62 @@ public class Generator extends Transform {
 
   private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
 
-  private final Set<String> hooks;
-  private static final String ATLAS_HOOK_CLASSNAME = "org.apache.atlas.hive.hook.HiveHook";
+  private final Predicate<ParseContext> statementFilter;
 
-  public Generator(Set<String> hooks) {
-    this.hooks = hooks;
+  enum LineageInfoFilter {
+    CREATE_TABLE(parseContext -> parseContext.getCreateTable() != null),
+    CREATE_TABLE_AS_SELECT(parseContext -> parseContext.getQueryProperties().isCTAS()),
+    CREATE_VIEW(parseContext -> parseContext.getQueryProperties().isView()),
+    CREATE_MATERIALIZED_VIEW(parseContext -> parseContext.getQueryProperties().isMaterializedView()),
+    LOAD(parseContext -> !(parseContext.getLoadTableWork() == null || parseContext.getLoadTableWork().isEmpty())),
+    QUERY(parseContext -> parseContext.getQueryProperties().isQuery()),
+    ALL(parseContext -> true),
+    NONE(parseContext -> false);
+
+    final Predicate<ParseContext> predicate;
+
+    LineageInfoFilter(Predicate<ParseContext> predicate) {
+      this.predicate = predicate;
+    }
+  }
+
+  public static Generator fromConf(HiveConf conf) {
+    return new Generator(createFilterPredicateFromConf(conf));
+  }
+
+  static Predicate<ParseContext> createFilterPredicateFromConf(Configuration conf) {
+    Set<LineageInfoFilter> operations = new HashSet<>();
+    boolean noneSpecified = false;
+    for (String valueText : conf.getStringCollection(HiveConf.ConfVars.HIVE_LINEAGE_STATEMENT_FILTER.varname)) {
+      LineageInfoFilter enumValue = EnumUtils.getEnumIgnoreCase(LineageInfoFilter.class, valueText);
+      if (enumValue == null) {
+        throw new EnumConstantNotPresentException(LineageInfoFilter.class, valueText);
+      }
+
+      if (LineageInfoFilter.NONE == enumValue) {
+        noneSpecified = true;
+        continue;
+      }
+
+      operations.add(enumValue);
+    }
+
+    if (noneSpecified) {
+      if (!operations.isEmpty()) {
+        throw new IllegalArgumentException(
+            "No other value can be specified when " + LineageInfoFilter.NONE.name() + " is present!");
+      }
+      else {
+        return parseContext -> false;
+      }
+    }
+
+    return parseContext ->
+        operations.stream().anyMatch(lineageInfoFilter -> lineageInfoFilter.predicate.test(parseContext));
+  }
+
+  public Generator(Predicate<ParseContext> statementFilter) {
+    this.statementFilter = statementFilter;
   }
 
   /* (non-Javadoc)
@@ -72,18 +128,11 @@ public Generator(Set<String> hooks) {
   @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
 
-    if (hooks != null && hooks.contains(ATLAS_HOOK_CLASSNAME)) {
-      // Atlas would be interested in lineage information for insert,load,create etc.
-      if (!pctx.getQueryProperties().isCTAS()
-          && !pctx.getQueryProperties().isMaterializedView()
-          && pctx.getQueryProperties().isQuery()
-          && pctx.getCreateTable() == null
-          && pctx.getCreateViewDesc() == null
-          && (pctx.getLoadTableWork() == null || pctx.getLoadTableWork().isEmpty())) {
-        LOG.debug("Not evaluating lineage");
-        return pctx;
-      }
+    if (!statementFilter.test(pctx)) {
+      LOG.debug("Not evaluating lineage");
+      return pctx;
     }
+
     Index index = pctx.getQueryState().getLineageState().getIndex();
     if (index == null) {
       index = new Index();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index a44def6..ca9d599 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -13226,7 +13226,7 @@ void analyzeInternal(ASTNode ast, Supplier<PlannerContext> pcf) throws SemanticE
           || postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {
         List<Transform> transformations = new ArrayList<Transform>();
         transformations.add(new HiveOpConverterPostProc());
-        transformations.add(new Generator(postExecHooks));
+        transformations.add(Generator.fromConf(conf));
         for (Transform t : transformations) {
           pCtx = t.transform(pCtx);
         }
@@ -15665,7 +15665,7 @@ private ASTNode buildSelExprSubTree(String tableAlias, String col) {
 
   private void copyInfoToQueryProperties(QueryProperties queryProperties) {
     if (qb != null) {
-      queryProperties.setQuery(qb.getIsQuery());
+      queryProperties.setQuery(qb.getIsQuery() && !forViewCreation);
       queryProperties.setAnalyzeCommand(qb.getParseInfo().isAnalyzeCommand());
       queryProperties.setNoScanAnalyzeCommand(qb.getParseInfo().isNoScanAnalyzeCommand());
       queryProperties.setAnalyzeRewrite(qb.isAnalyzeRewrite());
@@ -15673,6 +15673,7 @@ private void copyInfoToQueryProperties(QueryProperties queryProperties) {
       queryProperties.setHasOuterOrderBy(!qb.getParseInfo().getIsSubQ() &&
           !qb.getParseInfo().getDestToOrderBy().isEmpty());
       queryProperties.setOuterQueryLimit(qb.getParseInfo().getOuterQueryLimit());
+      queryProperties.setView(forViewCreation);
       queryProperties.setMaterializedView(qb.isMaterializedView());
     }
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/lineage/TestGenerator.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/lineage/TestGenerator.java
new file mode 100644
index 0000000..38eff31
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/lineage/TestGenerator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.lineage;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryProperties;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.junit.Test;
+
+import java.util.function.Predicate;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.*;
+
+public class TestGenerator {
+  @Test
+  public void testCreateFilterPredicateFromConf() {
+    HiveConf conf = new HiveConf();
+    conf.set(HiveConf.ConfVars.HIVE_LINEAGE_STATEMENT_FILTER.varname,
+        "CREATE_tABLE_AS_sELECT," + HiveOperation.QUERY.name());
+
+    Predicate<ParseContext> predicate = Generator.createFilterPredicateFromConf(conf);
+
+    ParseContext parseContext = new ParseContext();
+    QueryProperties queryProperties = new QueryProperties();
+    queryProperties.setCTAS(true);
+    parseContext.setQueryProperties(queryProperties);
+    assertThat(predicate.test(parseContext), is(true));
+
+    parseContext = new ParseContext();
+    queryProperties = new QueryProperties();
+    queryProperties.setQuery(true);
+    parseContext.setQueryProperties(queryProperties);
+    assertThat(predicate.test(parseContext), is(true));
+
+    parseContext = new ParseContext();
+    queryProperties = new QueryProperties();
+    queryProperties.setView(true);
+    parseContext.setQueryProperties(queryProperties);
+    assertThat(predicate.test(parseContext), is(false));
+  }
+
+  @Test
+  public void testCreateFilterPredicateFromConfReturnsAlwaysTrueWhenSettingIsNotPresent() {
+    HiveConf conf = new HiveConf();
+
+    Predicate<ParseContext> predicate = Generator.createFilterPredicateFromConf(conf);
+
+    assertThat(predicate.test(new ParseContext()), is(true));
+  }
+
+  @Test
+  public void testCreateFilterPredicateFromConfReturnsAlwaysFalseWhenSettingValueIsEmptyString() {
+    HiveConf conf = new HiveConf();
+    conf.set(HiveConf.ConfVars.HIVE_LINEAGE_STATEMENT_FILTER.varname, "");
+
+    Predicate<ParseContext> predicate = Generator.createFilterPredicateFromConf(conf);
+
+    assertThat(predicate.test(new ParseContext()), is(false));
+  }
+
+  @Test
+  public void testCreateFilterPredicateFromConfThrowsExceptionWhenInputStringIsInvalid() {
+    HiveConf conf = new HiveConf();
+    conf.set(HiveConf.ConfVars.HIVE_LINEAGE_STATEMENT_FILTER.varname, "Invalid");
+
+    EnumConstantNotPresentException exception = assertThrows(
+        EnumConstantNotPresentException.class,
+        () -> Generator.createFilterPredicateFromConf(conf)
+    );
+
+    assertThat(exception.getMessage(), is(
+        "org.apache.hadoop.hive.ql.optimizer.lineage.Generator$LineageInfoFilter.Invalid"));
+  }
+
+  @Test
+  public void testCreateFilterPredicateFromConfThrowsExceptionWhenNoneAndAnyOtherConstantPresent() {
+    HiveConf conf = new HiveConf();
+    conf.set(HiveConf.ConfVars.HIVE_LINEAGE_STATEMENT_FILTER.varname, "None," + HiveOperation.QUERY.name());
+
+    IllegalArgumentException exception = assertThrows(
+        IllegalArgumentException.class,
+        () -> Generator.createFilterPredicateFromConf(conf)
+    );
+
+    assertThat(exception.getMessage(), is("No other value can be specified when NONE is present!"));
+  }
+}
\ No newline at end of file
diff --git a/ql/src/test/queries/clientpositive/lineage6.q b/ql/src/test/queries/clientpositive/lineage6.q
new file mode 100644
index 0000000..1dc9613
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/lineage6.q
@@ -0,0 +1,13 @@
+set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.LineageLogger;
+set hive.lineage.statement.filter=Query;
+
+create table table_1_qegkz (id int, first_name string);
+create table table_2_gkvuw (id int, last_name string);
+
+select table_1_qegkz.id, concat_ws(' ' , table_1_qegkz.first_name, table_2_gkvuw.last_name) full_name from table_1_qegkz, table_2_gkvuw where table_1_qegkz.id = table_2_gkvuw.id;
+
+create view view_fcuyp as (select table_1_qegkz.id, concat_ws(' ' , table_1_qegkz.first_name, table_2_gkvuw.last_name) full_name from table_1_qegkz, table_2_gkvuw where table_1_qegkz.id = table_2_gkvuw.id);
+
+set hive.lineage.statement.filter=Query,CREATE_VIEW;
+
+create view view_fcuyp2 as (select table_1_qegkz.id, concat_ws(' ' , table_1_qegkz.first_name, table_2_gkvuw.last_name) full_name from table_1_qegkz, table_2_gkvuw where table_1_qegkz.id = table_2_gkvuw.id);
diff --git a/ql/src/test/results/clientpositive/llap/lineage6.q.out b/ql/src/test/results/clientpositive/llap/lineage6.q.out
new file mode 100644
index 0000000..41daca1
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/lineage6.q.out
@@ -0,0 +1,28 @@
+PREHOOK: query: create table table_1_qegkz (id int, first_name string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@table_1_qegkz
+PREHOOK: query: create table table_2_gkvuw (id int, last_name string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@table_2_gkvuw
+PREHOOK: query: select table_1_qegkz.id, concat_ws(' ' , table_1_qegkz.first_name, table_2_gkvuw.last_name) full_name from table_1_qegkz, table_2_gkvuw where table_1_qegkz.id = table_2_gkvuw.id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@table_1_qegkz
+PREHOOK: Input: default@table_2_gkvuw
+#### A masked pattern was here ####
+{"version":"1.0","engine":"tez","database":"default","hash":"35144a690834a6399b85caf27dbf8a3c","queryText":"select table_1_qegkz.id, concat_ws(' ' , table_1_qegkz.first_name, table_2_gkvuw.last_name) full_name from table_1_qegkz, table_2_gkvuw where table_1_qegkz.id = table_2_gkvuw.id","edges":[{"sources":[2],"targets":[0],"edgeType":"PROJECTION"},{"sources":[3,4],"targets":[1],"expression":"concat_ws(' ', table_1_qegkz.first_name, table_2_gkvuw.last_name)","edgeType":"PROJECTION"},{"sources":[2],"targets":[0,1],"expression":"table_1_qegkz.id is not null","edgeType":"PREDICATE"},{"sources":[2,5],"targets":[0,1],"expression":"(table_1_qegkz.id = table_2_gkvuw.id)","edgeType":"PREDICATE"},{"sources":[5],"targets":[0,1],"expression":"table_2_gkvuw.id is not null","edgeType":"PREDICATE"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"table_1_qegkz.id"},{"id":1,"vertexType":"COLUMN","vertexId":"full_name"},{"id":2,"vertexType":"COLUMN","vertexId":"default.table_1_qegkz.id"},{"id":3,"vertexType":"COLUMN","vertexId":"default.table_1_qegkz.first_name"},{"id":4,"vertexType":"COLUMN","vertexId":"default.table_2_gkvuw.last_name"},{"id":5,"vertexType":"COLUMN","vertexId":"default.table_2_gkvuw.id"}]}
+PREHOOK: query: create view view_fcuyp as (select table_1_qegkz.id, concat_ws(' ' , table_1_qegkz.first_name, table_2_gkvuw.last_name) full_name from table_1_qegkz, table_2_gkvuw where table_1_qegkz.id = table_2_gkvuw.id)
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: default@table_1_qegkz
+PREHOOK: Input: default@table_2_gkvuw
+PREHOOK: Output: database:default
+PREHOOK: Output: default@view_fcuyp
+{"version":"1.0","engine":"tez","database":"default","hash":"26e597ea7b3ca43b008466502b235b52","queryText":"create view view_fcuyp as (select table_1_qegkz.id, concat_ws(' ' , table_1_qegkz.first_name, table_2_gkvuw.last_name) full_name from table_1_qegkz, table_2_gkvuw where table_1_qegkz.id = table_2_gkvuw.id)","edges":[],"vertices":[]}
+PREHOOK: query: create view view_fcuyp2 as (select table_1_qegkz.id, concat_ws(' ' , table_1_qegkz.first_name, table_2_gkvuw.last_name) full_name from table_1_qegkz, table_2_gkvuw where table_1_qegkz.id = table_2_gkvuw.id)
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: default@table_1_qegkz
+PREHOOK: Input: default@table_2_gkvuw
+PREHOOK: Output: database:default
+PREHOOK: Output: default@view_fcuyp2
+{"version":"1.0","engine":"tez","database":"default","hash":"3c6019537df2f6001cce07de1aa0e352","queryText":"create view view_fcuyp2 as (select table_1_qegkz.id, concat_ws(' ' , table_1_qegkz.first_name, table_2_gkvuw.last_name) full_name from table_1_qegkz, table_2_gkvuw where table_1_qegkz.id = table_2_gkvuw.id)","edges":[{"sources":[2],"targets":[0],"edgeType":"PROJECTION"},{"sources":[3,4],"targets":[1],"expression":"concat_ws(' ', table_1_qegkz.first_name, table_2_gkvuw.last_name)","edgeType":"PROJECTION"},{"sources":[2],"targets":[0,1],"expression":"table_1_qegkz.id is not null","edgeType":"PREDICATE"},{"sources":[2,5],"targets":[0,1],"expression":"(table_1_qegkz.id = table_2_gkvuw.id)","edgeType":"PREDICATE"},{"sources":[5],"targets":[0,1],"expression":"table_2_gkvuw.id is not null","edgeType":"PREDICATE"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"default.view_fcuyp2.id"},{"id":1,"vertexType":"COLUMN","vertexId":"default.view_fcuyp2.full_name"},{"id":2,"vertexType":"COLUMN","vertexId":"default.table_1_qegkz.id"},{"id":3,"vertexType":"COLUMN","vertexId":"default.table_1_qegkz.first_name"},{"id":4,"vertexType":"COLUMN","vertexId":"default.table_2_gkvuw.last_name"},{"id":5,"vertexType":"COLUMN","vertexId":"default.table_2_gkvuw.id"}]}