Integration tests with newer spark version (#4604)

diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 1c3fb82..a30234b 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -67,7 +67,7 @@
     zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000");
 
     notebook = TestUtils.getInstance(Notebook.class);
-    sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
+    sparkHome = DownloadUtils.downloadSpark("3.2.4", "3.2");
     flinkHome = DownloadUtils.downloadFlink("1.13.2", "2.11");
   }
 
@@ -188,11 +188,11 @@
       assertEquals(result.toString(), Status.FINISHED, result.getStatus());
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
-      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("2.4.4"));
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("3.2.4"));
       assertEquals(0, result.getJobUrls().size());
 
       // pyspark
-      result = session.execute("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.registerTempTable('df')\ndf.show()");
+      result = session.execute("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.createOrReplaceTempView('df')\ndf.show()");
       assertEquals(Status.FINISHED, result.getStatus());
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
@@ -211,7 +211,7 @@
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
       assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting"));
-      assertEquals(2, result.getJobUrls().size());
+      assertTrue(result.getJobUrls().size() > 0);
 
       // spark sql
       result = session.execute("sql", "select * from df");
@@ -226,7 +226,7 @@
       assertEquals(Status.ERROR, result.getStatus());
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
-      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database"));
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found: unknown_table"));
       assertEquals(0, result.getJobUrls().size());
 
     } finally {
@@ -257,11 +257,11 @@
       assertEquals(result.toString(), Status.FINISHED, result.getStatus());
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
-      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("2.4.4"));
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("3.2.4"));
       assertEquals(0, result.getJobUrls().size());
 
       // pyspark
-      result = session.submit("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.registerTempTable('df')\ndf.show()");
+      result = session.submit("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.createOrReplaceTempView('df')\ndf.show()");
       result = session.waitUntilFinished(result.getStatementId());
       assertEquals(result.toString(), Status.FINISHED, result.getStatus());
       assertEquals(1, result.getResults().size());
@@ -282,7 +282,7 @@
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
       assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting"));
-      assertEquals(2, result.getJobUrls().size());
+      assertTrue(result.getJobUrls().size() > 0);
 
       // spark sql
       result = session.submit("sql", "select * from df");
@@ -299,7 +299,7 @@
       assertEquals(Status.ERROR, result.getStatus());
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
-      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database"));
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found: unknown_table"));
       assertEquals(0, result.getJobUrls().size());
 
       // cancel