SUBMARINE-679. Spark Security Plugin Support Spark-3.0.1

### What is this PR for?
Submarine Spark Security Plugin Support Spark-3.x

### What type of PR is it?
[Improvement | Feature]

### What is the Jira issue?
[SUBMARINE-679](https://issues.apache.org/jira/browse/SUBMARINE-679)

### Questions:
* Does this need documentation? YES

Author: atovk <neatovk@gmail.com>

Closes #463 from atovk/SUBMARINE-679 and squashes the following commits:

56742a9 [atovk] fit scala-style check
90788fa [atovk] SubmarineSqlParser class Compatible for spark2.x and spark3.x
b1309f9 [atovk] clean Dependency for test case in profiles hadoop-2.9 (hadoop3.x untested)
27eb9c2 [atovk] add doc available profiles spark-3.0
ba296c6 [atovk] fit test case in spark2.x and spark3.x
eee31b5 [atovk] fix ranger in spark3.x subquery Permission filter
e002c3d [atovk] fit spark 3.0 Dependency version
138f729 [atovk] add Test submarine spark security with spark 3.0
276af6c [atovk] SUBMARINE-679. Spark Security Plugin Support Spark-3.0.1
diff --git a/.travis.yml b/.travis.yml
index 3e08a27..da0379d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -333,6 +333,46 @@
         - TEST_FLAG=$BUILD_FLAG
         - PROFILE="-Pspark-2.4 -Pranger-2.0"
         - MODULES="-pl :submarine-spark-security"
+
+    - name: Test submarine spark security with spark 3.0 and ranger 1.0
+      language: scala
+      jdk: openjdk8
+      env:
+        - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
+        - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true"
+        - TEST_FLAG=$BUILD_FLAG
+        - PROFILE="-Pspark-3.0 -Pranger-1.0"
+        - MODULES="-pl :submarine-spark-security"
+
+    - name: Test submarine spark security with spark 3.0 and ranger 1.1
+      language: scala
+      jdk: openjdk8
+      env:
+        - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
+        - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true"
+        - TEST_FLAG=$BUILD_FLAG
+        - PROFILE="-Pspark-3.0 -Pranger-1.1"
+        - MODULES="-pl :submarine-spark-security"
+
+    - name: Test submarine spark security with spark 3.0 and ranger 1.2
+      language: scala
+      jdk: openjdk8
+      env:
+        - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
+        - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true"
+        - TEST_FLAG=$BUILD_FLAG
+        - PROFILE="-Pspark-3.0 -Pranger-1.2"
+        - MODULES="-pl :submarine-spark-security"
+
+    - name: Test submarine spark security with spark 3.0 and ranger 2.0
+      language: scala
+      jdk: openjdk8
+      env:
+        - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
+        - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true"
+        - TEST_FLAG=$BUILD_FLAG
+        - PROFILE="-Pspark-3.0 -Pranger-2.0"
+        - MODULES="-pl :submarine-spark-security"
 install:
   - mvn --version
   - echo ">>> mvn $BUILD_FLAG $MODULES $PROFILE -B"
diff --git a/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md b/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md
index c58bf4f..ff30112 100644
--- a/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md
+++ b/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md
@@ -25,6 +25,6 @@
 
 Currently, available profiles are:
 
-Spark: -Pspark-2.3, -Pspark-2.4
+Spark: -Pspark-2.3, -Pspark-2.4, -Pspark-3.0
 
 Ranger: -Pranger-1.0, -Pranger-1.1, -Pranger-1.2 -Pranger-2.0
diff --git a/submarine-security/spark-security/pom.xml b/submarine-security/spark-security/pom.xml
index ee88738..36a1f43 100644
--- a/submarine-security/spark-security/pom.xml
+++ b/submarine-security/spark-security/pom.xml
@@ -46,6 +46,7 @@
     <ranger.spark.package>submarine_spark_ranger_project</ranger.spark.package>
     <ranger.version>1.1.0</ranger.version>
     <ranger.major.version>1</ranger.major.version>
+    <spark.compatible.version>2</spark.compatible.version>
     <scala.version>2.11.8</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
     <scalatest.version>2.2.6</scalatest.version>
@@ -61,6 +62,11 @@
   </properties>
   <dependencies>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
       <version>${scala.version}</version>
@@ -321,6 +327,19 @@
               </sources>
             </configuration>
           </execution>
+
+          <execution>
+            <id>add-spark-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>spark-${spark.compatible.version}/src/main/scala</source>
+              </sources>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
       <plugin>
@@ -552,6 +571,20 @@
     </profile>
 
     <profile>
+      <id>spark-3.0</id>
+      <properties>
+        <spark.version>3.0.1</spark.version>
+        <scala.version>2.12.10</scala.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        <!--<scalatest.version>3.2.0</scalatest.version>-->
+        <spark.compatible.version>3</spark.compatible.version>
+        <commons-lang3.version>3.9</commons-lang3.version>
+        <jackson-databind.version>2.10.5</jackson-databind.version>
+        <jackson-annotations.version>2.10.5</jackson-annotations.version>
+      </properties>
+    </profile>
+
+    <profile>
       <id>ranger-1.0</id>
       <properties>
         <eclipse.jpa.version>2.5.2</eclipse.jpa.version>
diff --git a/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala
new file mode 100644
index 0000000..4b2e6dd
--- /dev/null
+++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, SetDatabaseCommand, ShowDatabasesCommand}
+
+object CompatibleFunc {
+
+  def getPattern(child: ShowDatabasesCommand) = child.databasePattern
+
+  def getCatLogName(s: SetDatabaseCommand) = s.databaseName
+
+  def analyzeColumnName(column: AnalyzeColumnCommand) = column.columnNames
+
+  def tableIdentifier(u: UnresolvedRelation) = u.tableIdentifier
+}
diff --git a/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala
new file mode 100644
index 0000000..7c0847f
--- /dev/null
+++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
+
+case class SubqueryCompatible(child: LogicalPlan, correlated: Boolean= false) {
+  Subquery(child)
+}
+
+object SubqueryCompatible {
+  // def apply(child: LogicalPlan, correlated: Boolean= false) = Subquery(child)
+  def unapply(subquery: Subquery): Option[LogicalPlan] = Subquery.unapply(subquery)
+}
+
+
+
diff --git a/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala
new file mode 100644
index 0000000..84325c3
--- /dev/null
+++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.execution.command.{PersistedView, SetDatabaseCommand, ShowDatabasesCommand}
+
+package object CompatibleCommand {
+
+  type ShowDatabasesCommandCompatible = ShowDatabasesCommand
+  type SetDatabaseCommandCompatible = SetDatabaseCommand
+
+}
+
+object PersistedViewCompatible {
+  val obj: PersistedView.type = PersistedView
+}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
similarity index 93%
rename from submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
rename to submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
index f1fd8d0..dc8c43e 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
+++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
@@ -43,10 +43,10 @@
 
   // scalastyle:off line.size.limit
   /**
-   * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`.
-   *
-   * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81
-   */
+    * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`.
+    *
+    * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81
+    */
   // scalastyle:on
   private def parse[T](command: String)(toResult: SubmarineSqlBaseParser => T): T = {
     val lexer = new SubmarineSqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala
new file mode 100644
index 0000000..5c50ced
--- /dev/null
+++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.{SetCatalogAndNamespace, ShowNamespaces}
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+
+object CompatibleFunc {
+
+  def getPattern(child: ShowNamespaces) = child.pattern
+
+  def getCatLogName(s: SetCatalogAndNamespace) = s.catalogName
+
+  def analyzeColumnName(column: AnalyzeColumnCommand) = column.columnNames.get
+
+  def tableIdentifier(u: UnresolvedRelation) = TableIdentifier.apply(u.tableName)
+}
diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala
new file mode 100644
index 0000000..63b9695
--- /dev/null
+++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
+
+object SubqueryCompatible {
+  def apply(child: LogicalPlan, correlated: Boolean) = Subquery(child, correlated)
+  def unapply(subquery: Subquery): Option[(LogicalPlan, Boolean)] = Subquery.unapply(subquery)
+}
+
+
diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala
new file mode 100644
index 0000000..8bd3311
--- /dev/null
+++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.spark.compatible
+
+import org.apache.spark.sql.catalyst.analysis.PersistedView
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SetCatalogAndNamespace, ShowNamespaces, Subquery}
+
+
+package object CompatibleCommand {
+
+  type ShowDatabasesCommandCompatible = ShowNamespaces
+  type SetDatabaseCommandCompatible = SetCatalogAndNamespace
+}
+
+object PersistedViewCompatible {
+  val obj: PersistedView.type = PersistedView
+}
+
+
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
similarity index 88%
copy from submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
copy to submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
index f1fd8d0..c6e11d0 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
+++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala
@@ -43,10 +43,10 @@
 
   // scalastyle:off line.size.limit
   /**
-   * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`.
-   *
-   * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81
-   */
+    * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`.
+    *
+    * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81
+    */
   // scalastyle:on
   private def parse[T](command: String)(toResult: SubmarineSqlBaseParser => T): T = {
     val lexer = new SubmarineSqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
@@ -104,4 +104,13 @@
   override def parseDataType(sqlText: String): DataType = {
     delegate.parseDataType(sqlText)
   }
+
+  override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+    delegate.parseMultipartIdentifier(sqlText)
+  }
+
+  override def parseRawDataType(sqlText: String): DataType = {
+    delegate.parseRawDataType(sqlText)
+  }
+
 }
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
index 0132385..84c8733 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -35,9 +35,11 @@
 import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
 import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
 
+import org.apache.submarine.spark.compatible.SubqueryCompatible
 import org.apache.submarine.spark.security._
 import org.apache.submarine.spark.security.SparkObjectType.COLUMN
 
+
 /**
  * An Apache Spark's [[Optimizer]] extension for column data masking.
  * TODO(kent yao) implement this as analyzer rule
@@ -233,7 +235,8 @@
 
       marked transformAllExpressions {
         case s: SubqueryExpression =>
-          val Subquery(newPlan) = Subquery(SubmarineDataMasking(s.plan))
+          val SubqueryCompatible(newPlan, _) = SubqueryCompatible(
+              SubmarineDataMasking(s.plan), SubqueryExpression.hasCorrelatedSubquery(s))
           s.withNewPlan(newPlan)
       }
   }
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
index be3c031..fc439b5 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
@@ -32,6 +32,7 @@
 import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
 import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
 
+import org.apache.submarine.spark.compatible.SubqueryCompatible
 import org.apache.submarine.spark.security._
 
 /**
@@ -62,9 +63,9 @@
         val analyzed = spark.sessionState.analyzer.execute(Filter(condition, plan))
         val optimized = analyzed transformAllExpressions {
           case s: SubqueryExpression =>
-            val Subquery(newPlan) =
-              rangerSparkOptimizer.execute(Subquery(SubmarineRowFilter(s.plan)))
-            s.withNewPlan(newPlan)
+            val SubqueryCompatible(newPlan, _) = SubqueryCompatible(
+              SubmarineRowFilter(s.plan), SubqueryExpression.hasCorrelatedSubquery(s))
+            s.withNewPlan(rangerSparkOptimizer.execute(newPlan))
         }
         SubmarineRowFilter(optimized)
       } else {
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala
index 0236d7c..68dc720 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala
@@ -29,6 +29,7 @@
 import org.apache.spark.sql.hive.PrivilegesBuilder
 import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
 
+import org.apache.submarine.spark.compatible.CompatibleCommand._
 import org.apache.submarine.spark.security.{RangerSparkAuthorizer, SparkAccessControlException}
 
 /**
@@ -52,7 +53,7 @@
    */
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan match {
-      case s: ShowDatabasesCommand => SubmarineShowDatabasesCommand(s)
+      case s: ShowDatabasesCommandCompatible => SubmarineShowDatabasesCommand(s)
       case s: SubmarineShowDatabasesCommand => s
       case s: ShowTablesCommand => SubmarineShowTablesCommand(s)
       case s: SubmarineShowTablesCommand => s
@@ -144,10 +145,10 @@
 
         case p if p.nodeName == "SaveIntoDataSourceCommand" => QUERY
         case s: SetCommand if s.kv.isEmpty || s.kv.get._2.isEmpty => SHOWCONF
-        case _: SetDatabaseCommand => SWITCHDATABASE
+        case _: SetDatabaseCommandCompatible => SWITCHDATABASE
         case _: ShowCreateTableCommand => SHOW_CREATETABLE
         case _: ShowColumnsCommand => SHOWCOLUMNS
-        case _: ShowDatabasesCommand => SHOWDATABASES
+        case _: ShowDatabasesCommandCompatible => SHOWDATABASES
         case _: ShowFunctionsCommand => SHOWFUNCTIONS
         case _: ShowPartitionsCommand => SHOWPARTITIONS
         case _: ShowTablesCommand => SHOWTABLES
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala
index d74c180..dd782f0 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala
@@ -20,16 +20,22 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.execution.command.{RunnableCommand, ShowDatabasesCommand}
+import org.apache.spark.sql.execution.command.RunnableCommand
 
+import org.apache.submarine.spark.compatible.CompatibleCommand.ShowDatabasesCommandCompatible
+import org.apache.submarine.spark.compatible.CompatibleFunc
 import org.apache.submarine.spark.security.{RangerSparkAuthorizer, SparkPrivilegeObject, SparkPrivilegeObjectType}
 
-case class SubmarineShowDatabasesCommand(child: ShowDatabasesCommand) extends RunnableCommand {
+case class SubmarineShowDatabasesCommand(child: ShowDatabasesCommandCompatible)
+  extends RunnableCommand {
   override val output = child.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val rows = child.run(sparkSession)
-    rows.filter(r => RangerSparkAuthorizer.isAllowed(toSparkPrivilegeObject(r)))
+    val catalog = sparkSession.sessionState.catalog
+    val databases = CompatibleFunc.getPattern(child)
+      .map(catalog.listDatabases).getOrElse(catalog.listDatabases()).map { d => Row(d) }
+
+    databases.filter(r => RangerSparkAuthorizer.isAllowed(toSparkPrivilegeObject(r)))
   }
 
   private def toSparkPrivilegeObject(row: Row): SparkPrivilegeObject = {
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala
index bbc1a47..958223f 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala
@@ -28,14 +28,17 @@
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.NamedExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Project}
-import org.apache.spark.sql.execution.command.{AlterDatabasePropertiesCommand, AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AlterTableRecoverPartitionsCommand, AlterTableRenameCommand, AlterTableRenamePartitionCommand, AlterTableSerDePropertiesCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AlterViewAsCommand, AnalyzeColumnCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, CreateDataSourceTableAsSelectCommand, CreateDataSourceTableCommand, CreateFunctionCommand, CreateTableCommand, CreateTableLikeCommand, CreateViewCommand, DescribeDatabaseCommand, DescribeFunctionCommand, DescribeTableCommand, DropDatabaseCommand, DropFunctionCommand, DropTableCommand, ExplainCommand, LoadDataCommand, PersistedView, RunnableCommand, SetDatabaseCommand, ShowColumnsCommand, ShowCreateTableCommand, ShowFunctionsCommand, ShowPartitionsCommand, ShowTablePropertiesCommand, ShowTablesCommand, TruncateTableCommand}
+import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
 import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
 import org.apache.spark.sql.types.StructField
 
+import org.apache.submarine.spark.compatible.{CompatibleFunc, PersistedViewCompatible}
+import org.apache.submarine.spark.compatible.CompatibleCommand.SetDatabaseCommandCompatible
 import org.apache.submarine.spark.security.{SparkPrivilegeObject, SparkPrivilegeObjectType, SparkPrivObjectActionType}
 import org.apache.submarine.spark.security.SparkPrivObjectActionType.SparkPrivObjectActionType
 
+
 /**
  * [[LogicalPlan]] -> list of [[SparkPrivilegeObject]]s
  */
@@ -119,7 +122,7 @@
         // Unfortunately, the real world is always a place where miracles happen.
         // We check the privileges directly without resolving the plan and leave everything
         // to spark to do.
-        addTableOrViewLevelObjs(u.tableIdentifier, privilegeObjects)
+        addTableOrViewLevelObjs(CompatibleFunc.tableIdentifier(u), privilegeObjects)
 
       case p =>
         for (child <- p.children) {
@@ -203,9 +206,9 @@
 
       case a: AnalyzeColumnCommand =>
         addTableOrViewLevelObjs(
-          a.tableIdent, inputObjs, columns = a.columnNames)
+          a.tableIdent, inputObjs, columns = CompatibleFunc.analyzeColumnName(a))
         addTableOrViewLevelObjs(
-          a.tableIdent, outputObjs, columns = a.columnNames)
+          a.tableIdent, outputObjs, columns = CompatibleFunc.analyzeColumnName(a))
 
       case a if a.nodeName == "AnalyzePartitionCommand" =>
         addTableOrViewLevelObjs(
@@ -252,7 +255,7 @@
 
       case c: CreateViewCommand =>
         c.viewType match {
-          case PersistedView =>
+          case PersistedViewCompatible.obj =>
             // PersistedView will be tied to a database
             addDbLevelObjs(c.name, outputObjs)
             addTableOrViewLevelObjs(c.name, outputObjs)
@@ -328,7 +331,8 @@
       case s if s.nodeName == "SaveIntoDataSourceCommand" =>
         buildQuery(getFieldVal(s, "query").asInstanceOf[LogicalPlan], outputObjs)
 
-      case s: SetDatabaseCommand => addDbLevelObjs(s.databaseName, inputObjs)
+      case s: SetDatabaseCommandCompatible =>
+        addDbLevelObjs(CompatibleFunc.getCatLogName(s), inputObjs)
 
       case s: ShowColumnsCommand => addTableOrViewLevelObjs(s.tableName, inputObjs)
 
diff --git a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
index b356fd5..3a7b473 100644
--- a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
+++ b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
@@ -336,7 +336,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -433,7 +434,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -652,7 +654,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -806,7 +809,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -920,7 +924,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -977,7 +982,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1034,7 +1040,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1091,7 +1098,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1156,7 +1164,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1221,7 +1230,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1285,7 +1295,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1349,7 +1360,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1413,7 +1425,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1477,7 +1490,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1541,7 +1555,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1598,7 +1613,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1655,7 +1671,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1712,7 +1729,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1769,7 +1787,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1826,7 +1845,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1883,7 +1903,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -1947,7 +1968,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -2011,7 +2033,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -2075,7 +2098,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -2139,7 +2163,8 @@
       "resources": {
         "database": {
           "values": [
-            "default"
+            "default",
+            "spark_catalog"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -2652,4 +2677,4 @@
     "version": 1
   },
   "auditMode": "audit-default"
-}
\ No newline at end of file
+}
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala
index 92b9ca4..ad23cf2 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala
@@ -20,12 +20,14 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.execution.{SubmarineShowDatabasesCommand, SubmarineShowTablesCommand}
-import org.apache.spark.sql.execution.command.{CreateDatabaseCommand, ShowDatabasesCommand, ShowTablesCommand}
+import org.apache.spark.sql.execution.command.{CreateDatabaseCommand, ShowTablesCommand}
 import org.apache.spark.sql.hive.test.TestHive
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
+import org.apache.submarine.spark.compatible.CompatibleCommand.ShowDatabasesCommandCompatible
 import org.apache.submarine.spark.security.SparkAccessControlException
 
+
 class SubmarineSparkRangerAuthorizationExtensionTest extends FunSuite with BeforeAndAfterAll {
 
   private val spark = TestHive.sparkSession.newSession()
@@ -35,7 +37,7 @@
   test("replace submarine show databases") {
     val df = spark.sql("show databases")
     val originalPlan = df.queryExecution.optimizedPlan
-    assert(originalPlan.isInstanceOf[ShowDatabasesCommand])
+    assert(originalPlan.isInstanceOf[ShowDatabasesCommandCompatible])
     val newPlan = authz(originalPlan)
     assert(newPlan.isInstanceOf[SubmarineShowDatabasesCommand])
   }
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
index 5cbf439..e8e9b8f 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
@@ -118,8 +118,12 @@
   test("use database") {
     withUser("alice") {
       val e = intercept[SparkAccessControlException](sql("use default"))
-      assert(e.getMessage === "Permission denied: user [alice] does not have [USE] privilege" +
-        " on [default]")
+      assert(
+        e.getMessage === "Permission denied: user [alice] " +
+          "does not have [USE] privilege on [default]"
+          ||
+        e.getMessage === "Permission denied: user [alice] " +
+          "does not have [USE] privilege on [spark_catalog]")
     }
     withUser("bob") {
       sql("use default")
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala
index 13a7d64..ddd281b 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala
@@ -35,7 +35,6 @@
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    SubmarineSparkUtils.enableAll(spark)
     spark.conf.set(SQLConf.CROSS_JOINS_ENABLED.key, "true")
 
     sql(
@@ -322,6 +321,8 @@
         |`wp_image_count` INT, `wp_max_ad_count` INT)
         |USING parquet
       """.stripMargin)
+
+    SubmarineSparkUtils.enableAll(spark)
   }
 
   private val tpcdsQueries = Seq(