SUBMARINE-679. Spark Security Plugin Support Spark-3.0.1
### What is this PR for?
Submarine Spark Security Plugin Support Spark-3.x
### What type of PR is it?
[Improvement | Feature]
### What is the Jira issue?
[SUBMARINE-679](https://issues.apache.org/jira/browse/SUBMARINE-679)
### Questions:
* Does this need documentation? YES
Author: atovk <neatovk@gmail.com>
Closes #463 from atovk/SUBMARINE-679 and squashes the following commits:
56742a9 [atovk] fit scala-style check
90788fa [atovk] SubmarineSqlParser class Compatible for spark2.x and spark3.x
b1309f9 [atovk] clean Dependency for test case in profiles hadoop-2.9 (hadoop3.x untested)
27eb9c2 [atovk] add doc available profiles spark-3.0
ba296c6 [atovk] fit test case in spark2.x and spark3.x
eee31b5 [atovk] fix ranger in spark3.x subquery Permission filter
e002c3d [atovk] fit spark 3.0 Dependency version
138f729 [atovk] add Test submarine spark security with spark 3.0
276af6c [atovk] SUBMARINE-679. Spark Security Plugin Support Spark-3.0.1
diff --git a/.travis.yml b/.travis.yml
index 3e08a27..da0379d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -333,6 +333,46 @@
- TEST_FLAG=$BUILD_FLAG
- PROFILE="-Pspark-2.4 -Pranger-2.0"
- MODULES="-pl :submarine-spark-security"
+
+ - name: Test submarine spark security with spark 3.0 and ranger 1.0
+ language: scala
+ jdk: openjdk8
+ env:
+ - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
+ - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true"
+ - TEST_FLAG=$BUILD_FLAG
+ - PROFILE="-Pspark-3.0 -Pranger-1.0"
+ - MODULES="-pl :submarine-spark-security"
+
+ - name: Test submarine spark security with spark 3.0 and ranger 1.1
+ language: scala
+ jdk: openjdk8
+ env:
+ - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
+ - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true"
+ - TEST_FLAG=$BUILD_FLAG
+ - PROFILE="-Pspark-3.0 -Pranger-1.1"
+ - MODULES="-pl :submarine-spark-security"
+
+ - name: Test submarine spark security with spark 3.0 and ranger 1.2
+ language: scala
+ jdk: openjdk8
+ env:
+ - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
+ - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true"
+ - TEST_FLAG=$BUILD_FLAG
+ - PROFILE="-Pspark-3.0 -Pranger-1.2"
+ - MODULES="-pl :submarine-spark-security"
+
+ - name: Test submarine spark security with spark 3.0 and ranger 2.0
+ language: scala
+ jdk: openjdk8
+ env:
+ - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
+ - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true"
+ - TEST_FLAG=$BUILD_FLAG
+ - PROFILE="-Pspark-3.0 -Pranger-2.0"
+ - MODULES="-pl :submarine-spark-security"
install:
- mvn --version
- echo ">>> mvn $BUILD_FLAG $MODULES $PROFILE -B"
diff --git a/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md b/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md
index c58bf4f..ff30112 100644
--- a/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md
+++ b/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md
@@ -25,6 +25,6 @@
Currently, available profiles are:
-Spark: -Pspark-2.3, -Pspark-2.4
+Spark: -Pspark-2.3, -Pspark-2.4, -Pspark-3.0
Ranger: -Pranger-1.0, -Pranger-1.1, -Pranger-1.2 -Pranger-2.0
diff --git a/submarine-security/spark-security/pom.xml b/submarine-security/spark-security/pom.xml
index ee88738..36a1f43 100644
--- a/submarine-security/spark-security/pom.xml
+++ b/submarine-security/spark-security/pom.xml
@@ -46,6 +46,7 @@
<ranger.spark.package>submarine_spark_ranger_project</ranger.spark.package>
<ranger.version>1.1.0</ranger.version>
<ranger.major.version>1</ranger.major.version>
+ <spark.compatible.version>2</spark.compatible.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<scalatest.version>2.2.6</scalatest.version>
@@ -61,6 +62,11 @@
</properties>
<dependencies>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
@@ -321,6 +327,19 @@
</sources>
</configuration>
</execution>
+
+ <execution>
+ <id>add-spark-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>spark-${spark.compatible.version}/src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
@@ -552,6 +571,20 @@
</profile>
<profile>
+ <id>spark-3.0</id>
+ <properties>
+ <spark.version>3.0.1</spark.version>
+ <scala.version>2.12.10</scala.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ <!--<scalatest.version>3.2.0</scalatest.version>-->
+ <spark.compatible.version>3</spark.compatible.version>
+ <commons-lang3.version>3.9</commons-lang3.version>
+ <jackson-databind.version>2.10.5</jackson-databind.version>
+ <jackson-annotations.version>2.10.5</jackson-annotations.version>
+ </properties>
+ </profile>
+
+ <profile>
<id>ranger-1.0</id>
<properties>
<eclipse.jpa.version>2.5.2</eclipse.jpa.version>
diff --git a/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala
new file mode 100644
index 0000000..4b2e6dd
--- /dev/null
+++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, SetDatabaseCommand, ShowDatabasesCommand}
+
+object CompatibleFunc {
+
+ def getPattern(child: ShowDatabasesCommand) = child.databasePattern
+
+ def getCatLogName(s: SetDatabaseCommand) = s.databaseName
+
+ def analyzeColumnName(column: AnalyzeColumnCommand) = column.columnNames
+
+ def tableIdentifier(u: UnresolvedRelation) = u.tableIdentifier
+}
diff --git a/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala
new file mode 100644
index 0000000..7c0847f
--- /dev/null
+++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
+
+case class SubqueryCompatible(child: LogicalPlan, correlated: Boolean= false) {
+ Subquery(child)
+}
+
+object SubqueryCompatible {
+ // def apply(child: LogicalPlan, correlated: Boolean= false) = Subquery(child)
+ def unapply(subquery: Subquery): Option[LogicalPlan] = Subquery.unapply(subquery)
+}
+
+
+
diff --git a/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala
new file mode 100644
index 0000000..84325c3
--- /dev/null
+++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.execution.command.{PersistedView, SetDatabaseCommand, ShowDatabasesCommand}
+
+package object CompatibleCommand {
+
+ type ShowDatabasesCommandCompatible = ShowDatabasesCommand
+ type SetDatabaseCommandCompatible = SetDatabaseCommand
+
+}
+
+object PersistedViewCompatible {
+ val obj: PersistedView.type = PersistedView
+}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
similarity index 93%
rename from submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
rename to submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
index f1fd8d0..dc8c43e 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
+++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
@@ -43,10 +43,10 @@
// scalastyle:off line.size.limit
/**
- * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`.
- *
- * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81
- */
+ * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`.
+ *
+ * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81
+ */
// scalastyle:on
private def parse[T](command: String)(toResult: SubmarineSqlBaseParser => T): T = {
val lexer = new SubmarineSqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala
new file mode 100644
index 0000000..5c50ced
--- /dev/null
+++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.{SetCatalogAndNamespace, ShowNamespaces}
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+
+object CompatibleFunc {
+
+ def getPattern(child: ShowNamespaces) = child.pattern
+
+ def getCatLogName(s: SetCatalogAndNamespace) = s.catalogName
+
+ def analyzeColumnName(column: AnalyzeColumnCommand) = column.columnNames.get
+
+ def tableIdentifier(u: UnresolvedRelation) = TableIdentifier.apply(u.tableName)
+}
diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala
new file mode 100644
index 0000000..63b9695
--- /dev/null
+++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
+
+object SubqueryCompatible {
+ def apply(child: LogicalPlan, correlated: Boolean) = Subquery(child, correlated)
+ def unapply(subquery: Subquery): Option[(LogicalPlan, Boolean)] = Subquery.unapply(subquery)
+}
+
+
diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala
new file mode 100644
index 0000000..8bd3311
--- /dev/null
+++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.analysis.PersistedView
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SetCatalogAndNamespace, ShowNamespaces, Subquery}
+
+
+package object CompatibleCommand {
+
+ type ShowDatabasesCommandCompatible = ShowNamespaces
+ type SetDatabaseCommandCompatible = SetCatalogAndNamespace
+}
+
+object PersistedViewCompatible {
+ val obj: PersistedView.type = PersistedView
+}
+
+
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
similarity index 88%
copy from submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
copy to submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
index f1fd8d0..c6e11d0 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
+++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
@@ -43,10 +43,10 @@
// scalastyle:off line.size.limit
/**
- * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`.
- *
- * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81
- */
+ * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`.
+ *
+ * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81
+ */
// scalastyle:on
private def parse[T](command: String)(toResult: SubmarineSqlBaseParser => T): T = {
val lexer = new SubmarineSqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
@@ -104,4 +104,13 @@
override def parseDataType(sqlText: String): DataType = {
delegate.parseDataType(sqlText)
}
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseRawDataType(sqlText: String): DataType = {
+ delegate.parseRawDataType(sqlText)
+ }
+
}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
index 0132385..84c8733 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -35,9 +35,11 @@
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
+import org.apache.submarine.spark.compatible.SubqueryCompatible
import org.apache.submarine.spark.security._
import org.apache.submarine.spark.security.SparkObjectType.COLUMN
+
/**
* An Apache Spark's [[Optimizer]] extension for column data masking.
* TODO(kent yao) implement this as analyzer rule
@@ -233,7 +235,8 @@
marked transformAllExpressions {
case s: SubqueryExpression =>
- val Subquery(newPlan) = Subquery(SubmarineDataMasking(s.plan))
+ val SubqueryCompatible(newPlan, _) = SubqueryCompatible(
+ SubmarineDataMasking(s.plan), SubqueryExpression.hasCorrelatedSubquery(s))
s.withNewPlan(newPlan)
}
}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
index be3c031..fc439b5 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
@@ -32,6 +32,7 @@
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
+import org.apache.submarine.spark.compatible.SubqueryCompatible
import org.apache.submarine.spark.security._
/**
@@ -62,9 +63,9 @@
val analyzed = spark.sessionState.analyzer.execute(Filter(condition, plan))
val optimized = analyzed transformAllExpressions {
case s: SubqueryExpression =>
- val Subquery(newPlan) =
- rangerSparkOptimizer.execute(Subquery(SubmarineRowFilter(s.plan)))
- s.withNewPlan(newPlan)
+ val SubqueryCompatible(newPlan, _) = SubqueryCompatible(
+ SubmarineRowFilter(s.plan), SubqueryExpression.hasCorrelatedSubquery(s))
+ s.withNewPlan(rangerSparkOptimizer.execute(newPlan))
}
SubmarineRowFilter(optimized)
} else {
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala
index 0236d7c..68dc720 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala
@@ -29,6 +29,7 @@
import org.apache.spark.sql.hive.PrivilegesBuilder
import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
+import org.apache.submarine.spark.compatible.CompatibleCommand._
import org.apache.submarine.spark.security.{RangerSparkAuthorizer, SparkAccessControlException}
/**
@@ -52,7 +53,7 @@
*/
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
- case s: ShowDatabasesCommand => SubmarineShowDatabasesCommand(s)
+ case s: ShowDatabasesCommandCompatible => SubmarineShowDatabasesCommand(s)
case s: SubmarineShowDatabasesCommand => s
case s: ShowTablesCommand => SubmarineShowTablesCommand(s)
case s: SubmarineShowTablesCommand => s
@@ -144,10 +145,10 @@
case p if p.nodeName == "SaveIntoDataSourceCommand" => QUERY
case s: SetCommand if s.kv.isEmpty || s.kv.get._2.isEmpty => SHOWCONF
- case _: SetDatabaseCommand => SWITCHDATABASE
+ case _: SetDatabaseCommandCompatible => SWITCHDATABASE
case _: ShowCreateTableCommand => SHOW_CREATETABLE
case _: ShowColumnsCommand => SHOWCOLUMNS
- case _: ShowDatabasesCommand => SHOWDATABASES
+ case _: ShowDatabasesCommandCompatible => SHOWDATABASES
case _: ShowFunctionsCommand => SHOWFUNCTIONS
case _: ShowPartitionsCommand => SHOWPARTITIONS
case _: ShowTablesCommand => SHOWTABLES
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala
index d74c180..dd782f0 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala
@@ -20,16 +20,22 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.execution.command.{RunnableCommand, ShowDatabasesCommand}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.submarine.spark.compatible.CompatibleCommand.ShowDatabasesCommandCompatible
+import org.apache.submarine.spark.compatible.CompatibleFunc
import org.apache.submarine.spark.security.{RangerSparkAuthorizer, SparkPrivilegeObject, SparkPrivilegeObjectType}
-case class SubmarineShowDatabasesCommand(child: ShowDatabasesCommand) extends RunnableCommand {
+case class SubmarineShowDatabasesCommand(child: ShowDatabasesCommandCompatible)
+ extends RunnableCommand {
override val output = child.output
override def run(sparkSession: SparkSession): Seq[Row] = {
- val rows = child.run(sparkSession)
- rows.filter(r => RangerSparkAuthorizer.isAllowed(toSparkPrivilegeObject(r)))
+ val catalog = sparkSession.sessionState.catalog
+ val databases = CompatibleFunc.getPattern(child)
+ .map(catalog.listDatabases).getOrElse(catalog.listDatabases()).map { d => Row(d) }
+
+ databases.filter(r => RangerSparkAuthorizer.isAllowed(toSparkPrivilegeObject(r)))
}
private def toSparkPrivilegeObject(row: Row): SparkPrivilegeObject = {
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala
index bbc1a47..958223f 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala
@@ -28,14 +28,17 @@
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Project}
-import org.apache.spark.sql.execution.command.{AlterDatabasePropertiesCommand, AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AlterTableRecoverPartitionsCommand, AlterTableRenameCommand, AlterTableRenamePartitionCommand, AlterTableSerDePropertiesCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AlterViewAsCommand, AnalyzeColumnCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, CreateDataSourceTableAsSelectCommand, CreateDataSourceTableCommand, CreateFunctionCommand, CreateTableCommand, CreateTableLikeCommand, CreateViewCommand, DescribeDatabaseCommand, DescribeFunctionCommand, DescribeTableCommand, DropDatabaseCommand, DropFunctionCommand, DropTableCommand, ExplainCommand, LoadDataCommand, PersistedView, RunnableCommand, SetDatabaseCommand, ShowColumnsCommand, ShowCreateTableCommand, ShowFunctionsCommand, ShowPartitionsCommand, ShowTablePropertiesCommand, ShowTablesCommand, TruncateTableCommand}
+import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
import org.apache.spark.sql.types.StructField
+import org.apache.submarine.spark.compatible.{CompatibleFunc, PersistedViewCompatible}
+import org.apache.submarine.spark.compatible.CompatibleCommand.SetDatabaseCommandCompatible
import org.apache.submarine.spark.security.{SparkPrivilegeObject, SparkPrivilegeObjectType, SparkPrivObjectActionType}
import org.apache.submarine.spark.security.SparkPrivObjectActionType.SparkPrivObjectActionType
+
/**
* [[LogicalPlan]] -> list of [[SparkPrivilegeObject]]s
*/
@@ -119,7 +122,7 @@
// Unfortunately, the real world is always a place where miracles happen.
// We check the privileges directly without resolving the plan and leave everything
// to spark to do.
- addTableOrViewLevelObjs(u.tableIdentifier, privilegeObjects)
+ addTableOrViewLevelObjs(CompatibleFunc.tableIdentifier(u), privilegeObjects)
case p =>
for (child <- p.children) {
@@ -203,9 +206,9 @@
case a: AnalyzeColumnCommand =>
addTableOrViewLevelObjs(
- a.tableIdent, inputObjs, columns = a.columnNames)
+ a.tableIdent, inputObjs, columns = CompatibleFunc.analyzeColumnName(a))
addTableOrViewLevelObjs(
- a.tableIdent, outputObjs, columns = a.columnNames)
+ a.tableIdent, outputObjs, columns = CompatibleFunc.analyzeColumnName(a))
case a if a.nodeName == "AnalyzePartitionCommand" =>
addTableOrViewLevelObjs(
@@ -252,7 +255,7 @@
case c: CreateViewCommand =>
c.viewType match {
- case PersistedView =>
+ case PersistedViewCompatible.obj =>
// PersistedView will be tied to a database
addDbLevelObjs(c.name, outputObjs)
addTableOrViewLevelObjs(c.name, outputObjs)
@@ -328,7 +331,8 @@
case s if s.nodeName == "SaveIntoDataSourceCommand" =>
buildQuery(getFieldVal(s, "query").asInstanceOf[LogicalPlan], outputObjs)
- case s: SetDatabaseCommand => addDbLevelObjs(s.databaseName, inputObjs)
+ case s: SetDatabaseCommandCompatible =>
+ addDbLevelObjs(CompatibleFunc.getCatLogName(s), inputObjs)
case s: ShowColumnsCommand => addTableOrViewLevelObjs(s.tableName, inputObjs)
diff --git a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
index b356fd5..3a7b473 100644
--- a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
+++ b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
@@ -336,7 +336,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -433,7 +434,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -652,7 +654,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -806,7 +809,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -920,7 +924,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -977,7 +982,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1034,7 +1040,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1091,7 +1098,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1156,7 +1164,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1221,7 +1230,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1285,7 +1295,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1349,7 +1360,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1413,7 +1425,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1477,7 +1490,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1541,7 +1555,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1598,7 +1613,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1655,7 +1671,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1712,7 +1729,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1769,7 +1787,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1826,7 +1845,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1883,7 +1903,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -1947,7 +1968,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -2011,7 +2033,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -2075,7 +2098,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -2139,7 +2163,8 @@
"resources": {
"database": {
"values": [
- "default"
+ "default",
+ "spark_catalog"
],
"isExcludes": false,
"isRecursive": false
@@ -2652,4 +2677,4 @@
"version": 1
},
"auditMode": "audit-default"
-}
\ No newline at end of file
+}
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala
index 92b9ca4..ad23cf2 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala
@@ -20,12 +20,14 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.execution.{SubmarineShowDatabasesCommand, SubmarineShowTablesCommand}
-import org.apache.spark.sql.execution.command.{CreateDatabaseCommand, ShowDatabasesCommand, ShowTablesCommand}
+import org.apache.spark.sql.execution.command.{CreateDatabaseCommand, ShowTablesCommand}
import org.apache.spark.sql.hive.test.TestHive
import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.apache.submarine.spark.compatible.CompatibleCommand.ShowDatabasesCommandCompatible
import org.apache.submarine.spark.security.SparkAccessControlException
+
class SubmarineSparkRangerAuthorizationExtensionTest extends FunSuite with BeforeAndAfterAll {
private val spark = TestHive.sparkSession.newSession()
@@ -35,7 +37,7 @@
test("replace submarine show databases") {
val df = spark.sql("show databases")
val originalPlan = df.queryExecution.optimizedPlan
- assert(originalPlan.isInstanceOf[ShowDatabasesCommand])
+ assert(originalPlan.isInstanceOf[ShowDatabasesCommandCompatible])
val newPlan = authz(originalPlan)
assert(newPlan.isInstanceOf[SubmarineShowDatabasesCommand])
}
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
index 5cbf439..e8e9b8f 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
@@ -118,8 +118,12 @@
test("use database") {
withUser("alice") {
val e = intercept[SparkAccessControlException](sql("use default"))
- assert(e.getMessage === "Permission denied: user [alice] does not have [USE] privilege" +
- " on [default]")
+ assert(
+ e.getMessage === "Permission denied: user [alice] " +
+ "does not have [USE] privilege on [default]"
+ ||
+ e.getMessage === "Permission denied: user [alice] " +
+ "does not have [USE] privilege on [spark_catalog]")
}
withUser("bob") {
sql("use default")
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala
index 13a7d64..ddd281b 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala
@@ -35,7 +35,6 @@
override def beforeAll(): Unit = {
super.beforeAll()
- SubmarineSparkUtils.enableAll(spark)
spark.conf.set(SQLConf.CROSS_JOINS_ENABLED.key, "true")
sql(
@@ -322,6 +321,8 @@
|`wp_image_count` INT, `wp_max_ad_count` INT)
|USING parquet
""".stripMargin)
+
+ SubmarineSparkUtils.enableAll(spark)
}
private val tpcdsQueries = Seq(