[ZEPPELIN-6006] Remove command line applications when downloading applications (#4746)

* Move Files with java

* Use java to download external dependecies

* Improve code after review

* Correct Mirror-URL and compilation
diff --git a/pom.xml b/pom.xml
index b9128bb..0d14167 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
     <module>zeppelin-jupyter</module>
     <module>zeppelin-plugins</module>
     <module>zeppelin-distribution</module>
+    <module>zeppelin-test</module>
   </modules>
 
   <properties>
@@ -116,6 +117,7 @@
     <!-- common library versions -->
     <slf4j.version>1.7.35</slf4j.version>
     <reload4j.version>1.2.25</reload4j.version>
+    <log4j2.version>2.23.1</log4j2.version>
     <libthrift.version>0.13.0</libthrift.version>
     <flexmark.all.version>0.62.2</flexmark.all.version>
     <gson.version>2.8.9</gson.version>
diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml
index c79306e..d0fc468 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -100,6 +100,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-test</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-engine</artifactId>
       <scope>test</scope>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 0d8167a..f52d063 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -24,6 +24,7 @@
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.zeppelin.test.DownloadUtils;
 import org.apache.zeppelin.interpreter.ExecutionContext;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -32,9 +33,7 @@
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 579c0b1..26dee4f 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
+import org.apache.zeppelin.test.DownloadUtils;
 import org.apache.zeppelin.interpreter.ExecutionContext;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -36,7 +37,6 @@
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.codehaus.plexus.util.xml.pull.XmlPullParserException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java
index e0af99a..0ae1bcc 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.zeppelin.test.DownloadUtils;
 import org.apache.zeppelin.interpreter.ExecutionContext;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -33,7 +34,6 @@
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -62,11 +62,9 @@
 
   @BeforeAll
   public static void setUp() throws IOException {
-    String sparkVersion = "3.4.1";
-    String hadoopVersion = "3";
-    LOGGER.info("Testing Spark Version: " + sparkVersion);
-    LOGGER.info("Testing Hadoop Version: " + hadoopVersion);
-    sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion);
+    LOGGER.info("Testing Spark Version: " + DownloadUtils.DEFAULT_SPARK_VERSION);
+    LOGGER.info("Testing Hadoop Version: " + DownloadUtils.DEFAULT_SPARK_HADOOP_VERSION);
+    sparkHome = DownloadUtils.downloadSpark();
 
     hadoopCluster = new MiniHadoopCluster();
     hadoopCluster.start();
@@ -102,7 +100,7 @@
       InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
       InterpreterResult interpreterResult =
               sparkSubmitInterpreter.interpret("--master local --class org.apache.spark.examples.SparkPi --deploy-mode client " +
-              sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context);
+              sparkHome + "/examples/jars/spark-examples_2.12-" + DownloadUtils.DEFAULT_SPARK_VERSION + ".jar", context);
       assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code(), interpreterResult.toString());
 
       // no yarn application launched
@@ -126,7 +124,7 @@
               sparkSubmitInterpreter.interpret("--master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi " +
                       "--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " +
                       "--conf spark.executor.memory=512m " +
-                      sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context);
+                      sparkHome + "/examples/jars/spark-examples_2.12-" + DownloadUtils.DEFAULT_SPARK_VERSION + ".jar", context);
       assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code(), interpreterResult.toString());
 
       GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.FINISHED));
@@ -159,7 +157,7 @@
                     sparkSubmitInterpreter.interpret("--master yarn  --deploy-mode cluster --class org.apache.spark.examples.SparkPi " +
                             "--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " +
                             "--conf spark.executor.memory=512m " +
-                            sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context);
+                            sparkHome + "/examples/jars/spark-examples_2.12-" + DownloadUtils.DEFAULT_SPARK_VERSION + ".jar", context);
             assertEquals(InterpreterResult.Code.INCOMPLETE, interpreterResult.code(), interpreterResult.toString());
             assertTrue(interpreterResult.toString().contains("Paragraph received a SIGTERM"), interpreterResult.toString());
           } catch (InterpreterException e) {
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 6832051..a02b88e 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -18,13 +18,13 @@
 package org.apache.zeppelin.integration;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.test.DownloadUtils;
 import org.apache.zeppelin.client.ClientConfig;
 import org.apache.zeppelin.client.ExecuteResult;
 import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
 import org.apache.zeppelin.client.Status;
 import org.apache.zeppelin.client.ZSession;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.rest.AbstractTestRestApi;
@@ -68,7 +68,7 @@
     zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000");
 
     notebook = TestUtils.getInstance(Notebook.class);
-    sparkHome = DownloadUtils.downloadSpark("3.4.1", "3");
+    sparkHome = DownloadUtils.downloadSpark();
     flinkHome = DownloadUtils.downloadFlink("1.17.1", "2.12");
   }
 
@@ -189,7 +189,7 @@
       assertEquals(Status.FINISHED, result.getStatus(), result.toString());
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
-      assertTrue(result.getResults().get(0).getData().contains("3.4.1"), result.getResults().get(0).getData());
+      assertTrue(result.getResults().get(0).getData().contains(DownloadUtils.DEFAULT_SPARK_VERSION), result.getResults().get(0).getData());
       assertEquals(0, result.getJobUrls().size());
 
       // pyspark
@@ -258,7 +258,7 @@
       assertEquals(Status.FINISHED, result.getStatus(), result.toString());
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
-      assertTrue(result.getResults().get(0).getData().contains("3.4.1"), result.getResults().get(0).getData());
+      assertTrue(result.getResults().get(0).getData().contains(DownloadUtils.DEFAULT_SPARK_VERSION), result.getResults().get(0).getData());
       assertEquals(0, result.getJobUrls().size());
 
       // pyspark
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
index c14e002..1a19bf4 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
@@ -18,8 +18,8 @@
 package org.apache.zeppelin.integration;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.test.DownloadUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.rest.AbstractTestRestApi;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index 3366f58..7c0cb00 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -17,6 +17,7 @@
 package org.apache.zeppelin.integration;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.test.DownloadUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.Input;
@@ -29,9 +30,7 @@
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.rest.AbstractTestRestApi;
@@ -39,9 +38,7 @@
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.utils.TestUtils;
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/zeppelin-test/pom.xml b/zeppelin-test/pom.xml
new file mode 100644
index 0000000..8973ecc
--- /dev/null
+++ b/zeppelin-test/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>zeppelin</artifactId>
+    <version>0.12.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>zeppelin-test</artifactId>
+  <name>Zeppelin: Test</name>
+  <description>Zeppelin test code used in other modules</description>
+  <properties>
+    <progressbar.version>0.9.5</progressbar.version>
+  </properties>
+  <packaging>jar</packaging>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>${commons.compress.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>me.tongfei</groupId>
+      <artifactId>progressbar</artifactId>
+      <version>${progressbar.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-engine</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <version>${log4j2.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java
new file mode 100644
index 0000000..50126ac
--- /dev/null
+++ b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.test;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+public class DownloadRequest {
+  private final URL url;
+  private final Optional<URL> alternativeUrl;
+  private final int retries;
+
+  public static final int DEFAULT_RETRIES = 3;
+
+  public DownloadRequest(String url, int retries) throws MalformedURLException {
+    this(url, null, retries);
+  }
+
+  public DownloadRequest(String url, String alternativeUrl) throws MalformedURLException {
+    this(url, alternativeUrl, DEFAULT_RETRIES);
+  }
+
+  public DownloadRequest(String url, String alternativeUrl, int retries)
+      throws MalformedURLException {
+    if (alternativeUrl != null) {
+      this.url = new URL(url);
+      this.alternativeUrl = Optional.of(new URL(alternativeUrl));
+      this.retries = retries;
+    } else {
+      this.url = new URL(url);
+      this.alternativeUrl = Optional.empty();
+      this.retries = retries;
+    }
+  }
+
+  public DownloadRequest(URL url, int retries) {
+    this(url, null, retries);
+  }
+
+  public DownloadRequest(URL url, URL alternativeUrl) {
+    this(url, alternativeUrl, DEFAULT_RETRIES);
+  }
+
+  public DownloadRequest(URL url, URL alternativeUrl, int retries) {
+    this.url = url;
+    this.alternativeUrl = Optional.of(alternativeUrl);
+    this.retries = retries;
+  }
+
+  public URL getUrl() {
+    return url;
+  }
+
+  public Optional<URL> getAlternativeUrl() {
+    return alternativeUrl;
+  }
+
+  public int getRetries() {
+    return retries;
+  }
+}
diff --git a/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java
new file mode 100644
index 0000000..37332b6
--- /dev/null
+++ b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.test;
+
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.tongfei.progressbar.DelegatingProgressBarConsumer;
+import me.tongfei.progressbar.ProgressBar;
+import me.tongfei.progressbar.ProgressBarBuilder;
+import me.tongfei.progressbar.ProgressBarStyle;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Optional;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/**
+ * Utility class for downloading spark/flink/livy. This is used for spark/flink integration test.
+ */
+public class DownloadUtils {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DownloadUtils.class);
+
+  public static final String APACHE_MIRROR_ENV_KEY = "APACHE_MIRROR";
+  public static final String PROGRESS_BAR_UPDATE_INTERVAL_ENV_KEY = "PROGRESS_BAR_UPDATE_INTERVAL";
+
+  private static final String MIRROR_URL;
+  private static final String ARCHIVE_URL = "https://archive.apache.org/dist/";
+  private static final int PROGRESS_BAR_UPDATE_INTERVAL;
+
+  private static String downloadFolder = System.getProperty("user.home") + "/.cache";
+  public static final String DEFAULT_SPARK_VERSION = "3.4.2";
+  public static final String DEFAULT_SPARK_HADOOP_VERSION = "3";
+
+
+  private DownloadUtils() {
+    throw new IllegalStateException("Utility class");
+  }
+
+  static {
+    try {
+      FileUtils.forceMkdir(new File(downloadFolder));
+      String envUrl = System.getenv(APACHE_MIRROR_ENV_KEY);
+      if (StringUtils.isNotBlank(envUrl)) {
+        MIRROR_URL = envUrl;
+      } else {
+        MIRROR_URL =
+            IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"),
+                StandardCharsets.UTF_8);
+      }
+      String envProgressUpdateInterval = System.getenv(PROGRESS_BAR_UPDATE_INTERVAL_ENV_KEY);
+      if (StringUtils.isNotBlank(envProgressUpdateInterval)) {
+        PROGRESS_BAR_UPDATE_INTERVAL = Integer.valueOf(envProgressUpdateInterval);
+      } else {
+        PROGRESS_BAR_UPDATE_INTERVAL = 1000;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Fail to create download folder: " + downloadFolder, e);
+    }
+  }
+
+  /**
+   * Download Spark with default versions
+   *
+   * @return home of Spark installation
+   */
+  public static String downloadSpark() {
+    return downloadSpark(DEFAULT_SPARK_VERSION, DEFAULT_SPARK_HADOOP_VERSION, null);
+  }
+
+  /**
+   * Download of a Spark distribution with a specific Hadoop version
+   *
+   * @param sparkVersion
+   * @param hadoopVersion
+   * @return home of Spark installation
+   */
+  public static String downloadSpark(String sparkVersion, String hadoopVersion) {
+    return downloadSpark(sparkVersion, hadoopVersion, null);
+  }
+
+  /**
+   * Download of a Spark distribution with a Hadoop and Scala version
+   *
+   * @param sparkVersion
+   * @param hadoopVersion
+   * @param scalaVersion
+   * @return home of Spark installation
+   */
+  public static String downloadSpark(String sparkVersion, String hadoopVersion,
+      String scalaVersion) {
+    File sparkFolder = new File(downloadFolder, "spark");
+    final File targetSparkHomeFolder;
+    if (StringUtils.isNotBlank(scalaVersion)) {
+      targetSparkHomeFolder = new File(sparkFolder,
+          "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + "-scala" + scalaVersion);
+    } else {
+      targetSparkHomeFolder = new File(sparkFolder,
+          "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion);
+    }
+
+    return downloadSpark(sparkVersion, hadoopVersion, scalaVersion, targetSparkHomeFolder);
+  }
+
+  /**
+   * Download of a Spark distribution
+   *
+   * @param sparkVersion
+   * @param hadoopVersion
+   * @param targetSparkHomeFolder - where should the spark archive be extracted
+   * @return home of Spark installation
+   */
+  public static String downloadSpark(String sparkVersion, String hadoopVersion, String scalaVersion,
+      File targetSparkHomeFolder) {
+    File sparkFolder = new File(downloadFolder, "spark");
+    sparkFolder.mkdir();
+    final String sparkVersionLog;
+    if (StringUtils.isBlank(scalaVersion)) {
+      sparkVersionLog = "Spark " + sparkVersion + "-" + hadoopVersion;
+    } else {
+      sparkVersionLog = "Spark " + sparkVersion + "-" + hadoopVersion + "-" + scalaVersion;
+    }
+    if (targetSparkHomeFolder.exists()) {
+      LOGGER.info("Skip to download {} as it is already downloaded.", sparkVersionLog);
+      return targetSparkHomeFolder.getAbsolutePath();
+    }
+    final File sparkTarGZ;
+    if (StringUtils.isBlank(scalaVersion)) {
+      sparkTarGZ =
+          new File(sparkFolder, "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + ".tgz");
+    } else {
+      sparkTarGZ = new File(sparkFolder, "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion
+          + "-scala" + scalaVersion + ".tgz");
+    }
+
+    try {
+      URL mirrorURL = new URL(MIRROR_URL +
+          generateSparkDownloadURL(sparkVersion, hadoopVersion, scalaVersion));
+      URL archiveURL = new URL(ARCHIVE_URL +
+          generateSparkDownloadURL(sparkVersion, hadoopVersion, scalaVersion));
+      LOGGER.info("Download {}", sparkVersionLog);
+      download(new DownloadRequest(mirrorURL, archiveURL), sparkTarGZ);
+      ProgressBarBuilder pbb = new ProgressBarBuilder()
+          .setTaskName("Unarchiv")
+          .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+          .setStyle(ProgressBarStyle.ASCII)
+          .setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL)
+          .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+      try (
+          InputStream fis = Files.newInputStream(sparkTarGZ.toPath());
+          InputStream pbis = ProgressBar.wrap(fis, pbb);
+          InputStream bis = new BufferedInputStream(pbis);
+          InputStream gzis = new GzipCompressorInputStream(bis);
+          ArchiveInputStream<TarArchiveEntry> o = new TarArchiveInputStream(gzis)) {
+        LOGGER.info("Unarchive {} to {}", sparkVersionLog, targetSparkHomeFolder);
+        unarchive(o, targetSparkHomeFolder, 1);
+        LOGGER.info("Unarchive {} done", sparkVersionLog);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to download spark", e);
+    }
+    return targetSparkHomeFolder.getAbsolutePath();
+  }
+
+
+  public static void download(String url, int retries, File dst) throws IOException {
+    download(new URL(url), retries, dst);
+  }
+
+  public static void download(DownloadRequest downloadRequest, File dst) throws IOException {
+    if (dst.exists()) {
+      LOGGER.info("Skip Download of {}, because it exists", dst);
+    } else {
+      boolean urlDownload = download(downloadRequest.getUrl(), downloadRequest.getRetries(), dst);
+      if (urlDownload) {
+        LOGGER.info("Download successfully");
+        return;
+      }
+      Optional<URL> alternativeURL = downloadRequest.getAlternativeUrl();
+      if (alternativeURL.isPresent()) {
+        urlDownload = download(alternativeURL.get(), downloadRequest.getRetries(), dst);
+        if (urlDownload) {
+          LOGGER.info("Download from alternative successfully");
+          return;
+        }
+      }
+      throw new IOException("Unable to download from " + downloadRequest.getUrl());
+    }
+  }
+
+  private static boolean download(URL url, int retries, File dst) {
+    int retry = 0;
+    while (retry < retries) {
+      try {
+        HttpURLConnection httpConnection = (HttpURLConnection) (url.openConnection());
+        long completeFileSize = httpConnection.getContentLength();
+        ProgressBarBuilder pbb = new ProgressBarBuilder()
+            .setTaskName("Download " + dst.getName())
+            .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+            .setStyle(ProgressBarStyle.ASCII)
+            .setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL)
+            .setInitialMax(completeFileSize)
+            .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+        try (
+            OutputStream fileOS = Files.newOutputStream(dst.toPath());
+            InputStream is = url.openStream();
+            InputStream pbis = ProgressBar.wrap(is, pbb);
+            InputStream bis = new BufferedInputStream(pbis)) {
+          IOUtils.copyLarge(bis, fileOS);
+          return true;
+        }
+      } catch (IOException e) {
+        LOGGER.info("Unable to download from {}", url, e);
+        ++retry;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @param livyVersion
+   * @param targetLivyHomeFolder
+   * @return livyHome
+   */
+  public static String downloadLivy(String livyVersion, String scalaVersion,
+      File targetLivyHomeFolder) {
+    File livyDownloadFolder = new File(downloadFolder, "livy");
+    livyDownloadFolder.mkdir();
+    final String livyLog = StringUtils.isBlank(scalaVersion) ? "Livy " + livyVersion : "Livy "
+        + livyVersion + "_" + scalaVersion;
+    if (targetLivyHomeFolder.exists()) {
+      LOGGER.info("Skip to download {} as it is already downloaded.", livyLog);
+      return targetLivyHomeFolder.getAbsolutePath();
+    }
+    final File livyZip;
+    if (StringUtils.isBlank(scalaVersion)) {
+      // e.g. apache-livy-0.7.1-incubating-bin.zip
+      livyZip = new File(livyDownloadFolder, "apache-livy-" + livyVersion + "-bin.zip");
+    } else {
+      // e.g apache-livy-0.8.0-incubating_2.12-bin.zip
+      livyZip = new File(livyDownloadFolder, "apache-livy-" + livyVersion + "_" + scalaVersion + "-bin.zip");
+    }
+
+    try {
+      URL mirrorURL = new URL(MIRROR_URL + generateLivyDownloadUrl(livyVersion, scalaVersion));
+      URL archiveURL = new URL(ARCHIVE_URL + generateLivyDownloadUrl(livyVersion, scalaVersion));
+      LOGGER.info("Download {}", livyLog);
+      download(new DownloadRequest(mirrorURL, archiveURL), livyZip);
+      LOGGER.info("Unzip {} to {}", livyLog, targetLivyHomeFolder);
+      ProgressBarBuilder pbb = new ProgressBarBuilder()
+          .setTaskName("Unarchiv Livy")
+          .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+          .setStyle(ProgressBarStyle.ASCII)
+          .setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL)
+          .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+      try (InputStream fis = Files.newInputStream(livyZip.toPath());
+          InputStream pbis = ProgressBar.wrap(fis, pbb);
+          InputStream bis = new BufferedInputStream(pbis);
+          ZipInputStream zis = new ZipInputStream(bis)) {
+        unzip(zis, targetLivyHomeFolder, 1);
+      }
+      LOGGER.info("Unzip {} done", livyLog);
+      // Create logs directory
+      File logs = new File(targetLivyHomeFolder, "logs");
+      logs.mkdir();
+    } catch (MalformedURLException e) {
+      LOGGER.error("invalid URL", e);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to download livy", e);
+    }
+    return targetLivyHomeFolder.getAbsolutePath();
+  }
+
+  /**
+   * @param livyVersion
+   * @return return livyHome
+   * @throws IOException
+   */
+  public static String downloadLivy(String livyVersion) {
+    File livyDownloadFolder = new File(downloadFolder, "livy");
+    File targetLivyHomeFolder = new File(livyDownloadFolder, "livy-" + livyVersion);
+    return downloadLivy(livyVersion, null, targetLivyHomeFolder);
+  }
+
+  public static String downloadLivy(String livyVersion, String scalaVersion) {
+    File livyDownloadFolder = new File(downloadFolder, "livy");
+    File targetLivyHomeFolder =
+        new File(livyDownloadFolder, "livy-" + livyVersion + "_" + scalaVersion);
+    return downloadLivy(livyVersion, scalaVersion, targetLivyHomeFolder);
+  }
+
+  private static File newFile(File destinationDir, ZipEntry zipEntry, int strip)
+      throws IOException {
+    String filename = zipEntry.getName();
+    for (int i = 0; i < strip; ++i) {
+      if (filename.contains(File.separator)) {
+        filename = filename.substring(filename.indexOf(File.separator) + 1);
+      }
+    }
+    File destFile = new File(destinationDir, filename);
+    String destDirPath = destinationDir.getCanonicalPath();
+    String destFilePath = destFile.getCanonicalPath();
+
+    if (!destFilePath.startsWith(destDirPath + File.separator)) {
+      throw new IOException("Entry is outside of the target dir: " + zipEntry.getName());
+    }
+
+    return destFile;
+  }
+
+  private static File newFile(File destDir, ArchiveEntry archiveEntry, int strip)
+      throws IOException {
+    String filename = archiveEntry.getName();
+    for (int i = 0; i < strip; ++i) {
+      if (filename.contains(File.separator)) {
+        filename = filename.substring(filename.indexOf(File.separator) + 1);
+      }
+    }
+    File destFile = new File(destDir, filename);
+    String destDirPath = destDir.getCanonicalPath();
+    String destFilePath = destFile.getCanonicalPath();
+
+    if (!destFilePath.startsWith(destDirPath + File.separator)) {
+      throw new IOException("Entry is outside of the target dir: " + archiveEntry.getName());
+    }
+
+    return destFile;
+  }
+
+  private static void unarchive(ArchiveInputStream<? extends ArchiveEntry> ais, File destDir,
+      int strip) throws IOException {
+    byte[] buffer = new byte[1024];
+    ArchiveEntry archiveEntry = ais.getNextEntry();
+    while (archiveEntry != null) {
+      File newFile;
+      try {
+        newFile = newFile(destDir, archiveEntry, strip);
+      } catch (IOException e) {
+        LOGGER.info("Skip {}", archiveEntry.getName());
+        archiveEntry = ais.getNextEntry();
+        continue;
+      }
+      if (archiveEntry.isDirectory()) {
+        if (!newFile.isDirectory() && !newFile.mkdirs()) {
+          throw new IOException("Failed to create directory " + newFile);
+        }
+      } else {
+        File parent = newFile.getParentFile();
+        if (!parent.isDirectory() && !parent.mkdirs()) {
+          throw new IOException("Failed to create directory " + parent);
+        }
+
+        // write file content
+        try (FileOutputStream fos = new FileOutputStream(newFile)) {
+          int len;
+          while ((len = ais.read(buffer)) > 0) {
+            fos.write(buffer, 0, len);
+          }
+        }
+        // Change permissions and metadata
+        if (newFile.getParentFile().getName().contains("bin")
+            && !newFile.setExecutable(true, false)) {
+          LOGGER.info("Setting file {} to executable failed", newFile);
+        }
+        if (!newFile.setLastModified(archiveEntry.getLastModifiedDate().getTime())) {
+          LOGGER.info("Setting last modified date to file {} failed", newFile);
+        }
+      }
+      archiveEntry = ais.getNextEntry();
+    }
+  }
+
+  private static void unzip(ZipInputStream zis, File destDir, int strip) throws IOException {
+    byte[] buffer = new byte[1024];
+    ZipEntry zipEntry = zis.getNextEntry();
+    while (zipEntry != null) {
+      File newFile;
+      try {
+        newFile = newFile(destDir, zipEntry, strip);
+      } catch (IOException e) {
+        LOGGER.info("Skip {}", zipEntry.getName());
+        zipEntry = zis.getNextEntry();
+        continue;
+      }
+      if (zipEntry.isDirectory()) {
+        if (!newFile.isDirectory() && !newFile.mkdirs()) {
+          throw new IOException("Failed to create directory " + newFile);
+        }
+      } else {
+        File parent = newFile.getParentFile();
+        if (!parent.isDirectory() && !parent.mkdirs()) {
+          throw new IOException("Failed to create directory " + parent);
+        }
+
+        // write file content
+        try (FileOutputStream fos = new FileOutputStream(newFile)) {
+          int len;
+          while ((len = zis.read(buffer)) > 0) {
+            fos.write(buffer, 0, len);
+          }
+        }
+        // Change permissions and metadata
+        if (newFile.getParentFile().getName().contains("bin")
+            && !newFile.setExecutable(true, false)) {
+          LOGGER.info("Setting file {} to executable failed", newFile);
+        }
+        if (!newFile.setLastModified(zipEntry.getLastModifiedTime().toMillis())) {
+          LOGGER.info("Setting last modified date to file {} failed", newFile);
+        }
+      }
+      zipEntry = zis.getNextEntry();
+    }
+    zis.closeEntry();
+  }
+
+  public static String downloadFlink(String flinkVersion, String scalaVersion) {
+    File flinkDownloadFolder = new File(downloadFolder, "flink");
+    flinkDownloadFolder.mkdir();
+    File targetFlinkHomeFolder = new File(flinkDownloadFolder, "flink-" + flinkVersion);
+    if (targetFlinkHomeFolder.exists()) {
+      LOGGER.info("Skip to download Flink {}_{} as it is already downloaded.", flinkVersion,
+          scalaVersion);
+      return targetFlinkHomeFolder.getAbsolutePath();
+    }
+    File flinkTGZ = new File(flinkDownloadFolder,
+        "flink-" + flinkVersion + "-bin-scala_" + scalaVersion + ".tgz");
+    try {
+      URL mirrorURL = new URL(MIRROR_URL + generateDownloadURL(
+              "flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz", "flink"));
+      URL archiveURL = new URL(ARCHIVE_URL + generateDownloadURL(
+          "flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz", "flink"));
+      LOGGER.info("Download Flink {}_{}", flinkVersion, scalaVersion);
+      download(new DownloadRequest(mirrorURL, archiveURL), flinkTGZ);
+      ProgressBarBuilder pbb = new ProgressBarBuilder()
+          .setTaskName("Unarchiv Flink")
+          .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+          .setStyle(ProgressBarStyle.ASCII)
+          .setUpdateIntervalMillis(1000)
+          .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+      try (
+          InputStream fis = Files.newInputStream(flinkTGZ.toPath());
+          InputStream pbis = ProgressBar.wrap(fis, pbb);
+          InputStream bis = new BufferedInputStream(pbis);
+          InputStream gzis = new GzipCompressorInputStream(bis);
+          ArchiveInputStream<TarArchiveEntry> o = new TarArchiveInputStream(gzis)) {
+        LOGGER.info("Unarchive Flink {}_{} to {}", flinkVersion, scalaVersion,
+            targetFlinkHomeFolder);
+        unarchive(o, targetFlinkHomeFolder, 1);
+        LOGGER.info("Unarchive Flink done");
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to download flink", e);
+    }
+
+
+    // download other dependencies for running flink with yarn and hive
+    try {
+      download("https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_"
+          + scalaVersion + "/"
+                      + flinkVersion + "/flink-connector-hive_" + scalaVersion + "-" + flinkVersion + ".jar",
+          3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-connector-hive_"
+              + scalaVersion + "-" + flinkVersion + ".jar"));
+      download("https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_" + scalaVersion + "/"
+                      + flinkVersion + "/flink-hadoop-compatibility_" + scalaVersion + "-" + flinkVersion + ".jar",
+          3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-hadoop-compatibility_"
+              + scalaVersion + "-" + flinkVersion + ".jar"));
+      download("https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.7/hive-exec-2.3.7.jar",
+          3, new File(targetFlinkHomeFolder, "lib" + File.separator + "hive-exec-2.3.4.jar"));
+      download(
+          "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.6/hadoop-client-api-3.3.6.jar",
+          3, new File(targetFlinkHomeFolder,
+              "lib" + File.separator + "hadoop-client-api-3.3.6.jar"));
+      download(
+          "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.6/hadoop-client-runtime-3.3.6.jar",
+          3, new File(targetFlinkHomeFolder,
+              "lib" + File.separator + "hadoop-client-runtime-3.3.6.jar"));
+      download("https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala_"
+          + scalaVersion + "/"
+                      + flinkVersion + "/flink-table-api-scala_" + scalaVersion + "-" + flinkVersion + ".jar",
+          3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-table-api-scala_"
+              + scalaVersion + "-" + flinkVersion + ".jar"));
+      download("https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_"
+          + scalaVersion + "/"
+                      + flinkVersion + "/flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar",
+          3, new File(targetFlinkHomeFolder, "lib" + File.separator
+              + "flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar"));
+
+      String jarName = "flink-table-planner_" + scalaVersion + "-" + flinkVersion + ".jar";
+      mvFile(
+          targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName,
+          targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName);
+      jarName = "flink-table-planner-loader-" + flinkVersion + ".jar";
+      mvFile(
+          targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName,
+          targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName);
+      if (SemanticVersion.of(flinkVersion).equalsOrNewerThan(SemanticVersion.of("1.16.0"))) {
+        jarName = "flink-sql-client-" + flinkVersion + ".jar";
+        mvFile(targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName,
+            targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Fail to download jar", e);
+    }
+    return targetFlinkHomeFolder.getAbsolutePath();
+  }
+
+  private static void mvFile(String srcPath, String dstPath) throws IOException {
+    Path src = Paths.get(srcPath);
+    Path dst = Paths.get(dstPath);
+    if (src.toFile().exists()) {
+      if (dst.toFile().exists()) {
+        LOGGER.warn("{} does exits - replacing", dstPath);
+        FileUtils.deleteQuietly(dst.toFile());
+      }
+      LOGGER.info("Copy file {} to {}", src, dst);
+      Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
+    } else {
+      LOGGER.warn("{} does not exits - skipping", srcPath);
+    }
+  }
+
+  public static String downloadHadoop(String version) {
+    File hadoopDownloadFolder = new File(downloadFolder, "hadoop");
+    hadoopDownloadFolder.mkdir();
+    File targetHadoopHomeFolder = new File(hadoopDownloadFolder, "hadoop-" + version);
+    if (targetHadoopHomeFolder.exists()) {
+      LOGGER.info("Skip to download Hadoop {} as it is already downloaded.", version);
+      return targetHadoopHomeFolder.getAbsolutePath();
+    }
+    File hadoopTGZ = new File(hadoopDownloadFolder, "hadoop-" + version + ".tar.gz");
+    try {
+      URL mirrorURL = new URL(MIRROR_URL + generateDownloadURL(
+          "hadoop", version, ".tar.gz", "hadoop/core"));
+      URL archiveURL = new URL(ARCHIVE_URL + generateDownloadURL(
+          "hadoop", version, ".tar.gz", "hadoop/core"));
+      LOGGER.info("Download Hadoop {}", version);
+      download(new DownloadRequest(mirrorURL, archiveURL), hadoopTGZ);
+      ProgressBarBuilder pbb = new ProgressBarBuilder()
+          .setTaskName("Unarchiv")
+          .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+          .setStyle(ProgressBarStyle.ASCII)
+          .setUpdateIntervalMillis(1000)
+          .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+      try (
+          InputStream fis = Files.newInputStream(hadoopTGZ.toPath());
+          InputStream pbis = ProgressBar.wrap(fis, pbb);
+          InputStream bis = new BufferedInputStream(pbis);
+          InputStream gzis = new GzipCompressorInputStream(bis);
+          ArchiveInputStream<TarArchiveEntry> o = new TarArchiveInputStream(gzis)) {
+        LOGGER.info("Unarchive Hadoop {} to {}", version, targetHadoopHomeFolder);
+        unarchive(o, targetHadoopHomeFolder, 1);
+        LOGGER.info("Unarchive Hadoop {} done", version);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to download hadoop");
+    }
+    return targetHadoopHomeFolder.getAbsolutePath();
+  }
+
+  private static String generateDownloadURL(String project, String version, String postFix,
+      String projectPath) {
+    return projectPath + "/" + project + "-" + version + "/" + project + "-" + version
+        + postFix;
+  }
+
+  private static String generateSparkDownloadURL(String sparkVersion, String hadoopVersion,
+      String scalaVersion) {
+    final String url;
+    String sparkVersionFolder = "spark/spark-" + sparkVersion;
+    if (StringUtils.isNotBlank(hadoopVersion)) {
+      if (StringUtils.isNotBlank(scalaVersion)) {
+        // spark-3.4.0-bin-hadoop3-scala2.13.tgz
+        url = sparkVersionFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion
+            + "-scala" + scalaVersion + ".tgz";
+      } else {
+        url =
+            sparkVersionFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + ".tgz";
+      }
+    } else {
+      url = sparkVersionFolder + "/spark-" + sparkVersion + "-bin-without-hadoop.tgz";
+    }
+    return url;
+  }
+
+  private static String generateLivyDownloadUrl(String livyVersion, String scalaVersion) {
+    SemanticVersion livy = SemanticVersion.of(livyVersion.replace("incubating", ""));
+    if (livy.equalsOrNewerThan(SemanticVersion.of("0.8.0"))) {
+      return "incubator/livy/" + livyVersion + "/apache-livy-" + livyVersion + "_" + scalaVersion
+          + "-bin.zip";
+    }
+    return "incubator/livy/" + livyVersion + "/apache-livy-" + livyVersion + "-bin.zip";
+  }
+}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java b/zeppelin-test/src/main/java/org/apache/zeppelin/test/SemanticVersion.java
similarity index 97%
rename from zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java
rename to zeppelin-test/src/main/java/org/apache/zeppelin/test/SemanticVersion.java
index f9bd771..f25e503 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java
+++ b/zeppelin-test/src/main/java/org/apache/zeppelin/test/SemanticVersion.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.zeppelin.interpreter.integration;
+package org.apache.zeppelin.test;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java b/zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java
new file mode 100644
index 0000000..6bed716
--- /dev/null
+++ b/zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+
+@Disabled("Takes a long time and depends on external factors.")
+class DownloadUtilsTest {
+
+  @Test
+  void downloadHadoop() {
+    String hadoopHome = DownloadUtils.downloadHadoop("3.4.0");
+    Path hadoopHomePath = Paths.get(hadoopHome);
+    assertTrue(hadoopHomePath.toFile().exists());
+    assertTrue(hadoopHomePath.toFile().isDirectory());
+  }
+
+  @Test
+  void downloadSpark() {
+    String sparkHome = DownloadUtils.downloadSpark();
+    Path sparkHomePath = Paths.get(sparkHome);
+    assertTrue(sparkHomePath.toFile().exists());
+    assertTrue(sparkHomePath.toFile().isDirectory());
+  }
+
+  @Test
+  void downloadSparkWithScala() {
+    String sparkHome = DownloadUtils.downloadSpark(DownloadUtils.DEFAULT_SPARK_VERSION, DownloadUtils.DEFAULT_SPARK_HADOOP_VERSION, "2.13");
+    Path sparkHomePath = Paths.get(sparkHome);
+    assertTrue(sparkHomePath.toFile().exists());
+    assertTrue(sparkHomePath.toFile().isDirectory());
+  }
+
+  @Test
+  void downloadFlink() {
+    String sparkHome = DownloadUtils.downloadFlink("1.16.3", "2.12");
+    Path sparkHomePath = Paths.get(sparkHome);
+    assertTrue(sparkHomePath.toFile().exists());
+    assertTrue(sparkHomePath.toFile().isDirectory());
+  }
+
+  @Test
+  void downloadLivy() {
+    String sparkHome = DownloadUtils.downloadLivy("0.7.1-incubating");
+    Path sparkHomePath = Paths.get(sparkHome);
+    assertTrue(sparkHomePath.toFile().exists());
+    assertTrue(sparkHomePath.toFile().isDirectory());
+  }
+
+  @Test
+  void downloadLivy080() {
+    String sparkHome = DownloadUtils.downloadLivy("0.8.0-incubating", "2.12");
+    Path sparkHomePath = Paths.get(sparkHome);
+    assertTrue(sparkHomePath.toFile().exists());
+    assertTrue(sparkHomePath.toFile().isDirectory());
+  }
+
+}
diff --git a/zeppelin-test/src/test/resources/log4j2.properties b/zeppelin-test/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..4935e68
--- /dev/null
+++ b/zeppelin-test/src/test/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger=debug, STDOUT
+
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %5p [%d] ({%t} %F[%M]:%L) - %m%n
+
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index cabe644..f3def93 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -268,6 +268,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-test</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
deleted file mode 100644
index 6310c1d..0000000
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.integration;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-
-/**
- * Utility class for downloading spark/flink. This is used for spark/flink integration test.
- */
-public class DownloadUtils {
-  private static Logger LOGGER = LoggerFactory.getLogger(DownloadUtils.class);
-
-  private static String downloadFolder = System.getProperty("user.home") + "/.cache";
-
-  static {
-    try {
-      FileUtils.forceMkdir(new File(downloadFolder));
-    } catch (IOException e) {
-      throw new RuntimeException("Fail to create download folder: " + downloadFolder, e);
-    }
-  }
-
-  public static String downloadSpark(String sparkVersion, String hadoopVersion) {
-    String sparkDownloadFolder = downloadFolder + "/spark";
-    File targetSparkHomeFolder =
-            new File(sparkDownloadFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion);
-    if (targetSparkHomeFolder.exists()) {
-      LOGGER.info("Skip to download spark as it is already downloaded.");
-      return targetSparkHomeFolder.getAbsolutePath();
-    }
-    download("spark", sparkVersion, "-bin-hadoop" + hadoopVersion + ".tgz");
-    return targetSparkHomeFolder.getAbsolutePath();
-  }
-
-  public static String downloadFlink(String flinkVersion, String scalaVersion) {
-    String flinkDownloadFolder = downloadFolder + "/flink";
-    File targetFlinkHomeFolder = new File(flinkDownloadFolder + "/flink-" + flinkVersion);
-    if (targetFlinkHomeFolder.exists()) {
-      LOGGER.info("Skip to download flink as it is already downloaded.");
-      return targetFlinkHomeFolder.getAbsolutePath();
-    }
-    download("flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz");
-    // download other dependencies for running flink with yarn and hive
-    try {
-      runShellCommand(new String[]{"wget",
-              "https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_" + scalaVersion + "/"
-                      + flinkVersion + "/flink-connector-hive_" + scalaVersion + "-" + flinkVersion + ".jar",
-              "-P", targetFlinkHomeFolder + "/lib"});
-      runShellCommand(new String[]{"wget",
-              "https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_" + scalaVersion + "/"
-                      + flinkVersion + "/flink-hadoop-compatibility_" + scalaVersion + "-" + flinkVersion + ".jar",
-              "-P", targetFlinkHomeFolder + "/lib"});
-      runShellCommand(new String[]{"wget",
-              "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.7/hive-exec-2.3.7.jar",
-              "-P", targetFlinkHomeFolder + "/lib"});
-      runShellCommand(new String[]{"wget",
-              "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.6/hadoop-client-api-3.3.6.jar",
-              "-P", targetFlinkHomeFolder + "/lib"});
-      runShellCommand(new String[]{"wget",
-              "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.6/hadoop-client-runtime-3.3.6.jar",
-              "-P", targetFlinkHomeFolder + "/lib"});
-      runShellCommand(new String[]{"wget",
-              "https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala_" + scalaVersion + "/"
-                      + flinkVersion + "/flink-table-api-scala_" + scalaVersion + "-" + flinkVersion + ".jar",
-              "-P", targetFlinkHomeFolder + "/lib"});
-      runShellCommand(new String[]{"wget",
-              "https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_" + scalaVersion + "/"
-                      + flinkVersion + "/flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar",
-              "-P", targetFlinkHomeFolder + "/lib"});
-      runShellCommand(new String[]{"mv",
-              targetFlinkHomeFolder + "/opt/" + "flink-table-planner_" + scalaVersion + "-" + flinkVersion + ".jar",
-              targetFlinkHomeFolder + "/lib"});
-      runShellCommand(new String[]{"mv",
-              targetFlinkHomeFolder + "/lib/" + "flink-table-planner-loader-" + flinkVersion + ".jar",
-              targetFlinkHomeFolder + "/opt"});
-      if (SemanticVersion.of(flinkVersion).equalsOrNewerThan(SemanticVersion.of("1.16.0"))) {
-        runShellCommand(new String[]{"mv",
-                targetFlinkHomeFolder + "/opt/" + "flink-sql-client-" + flinkVersion + ".jar",
-                targetFlinkHomeFolder + "/lib"});
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Fail to download jar", e);
-    }
-    return targetFlinkHomeFolder.getAbsolutePath();
-  }
-
-  public static String downloadHadoop(String version) {
-    String hadoopDownloadFolder = downloadFolder + "/hadoop";
-    File targetHadoopHomeFolder = new File(hadoopDownloadFolder + "/hadoop-" + version);
-    if (targetHadoopHomeFolder.exists()) {
-      LOGGER.info("Skip to download hadoop as it is already downloaded.");
-      return targetHadoopHomeFolder.getAbsolutePath();
-    }
-    download("hadoop", version, ".tar.gz", "hadoop/core");
-    return targetHadoopHomeFolder.getAbsolutePath();
-  }
-
-  // Try mirrors first, if fails fallback to apache archive
-  private static void download(String project, String version, String postFix, String projectPath) {
-    String projectDownloadFolder = downloadFolder + "/" + project;
-    try {
-      String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"), StandardCharsets.UTF_8);
-      File downloadFile = new File(projectDownloadFolder + "/" + project + "-" + version + postFix);
-      String downloadURL = preferredMirror + "/" + projectPath + "/" + project + "-" + version + "/" + project + "-" + version + postFix;
-      runShellCommand(new String[]{"wget", downloadURL, "-P", projectDownloadFolder});
-      runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", projectDownloadFolder});
-    } catch (Exception e) {
-      LOGGER.warn("Failed to download " + project + " from mirror site, fallback to use apache archive", e);
-      File downloadFile = new File(projectDownloadFolder + "/" + project + "-" + version + postFix);
-      String downloadURL =
-              "https://archive.apache.org/dist/" + projectPath + "/" + project +"-"
-                      + version
-                      + "/" + project + "-"
-                      + version
-                      + postFix;
-      try {
-        runShellCommand(new String[]{"wget", downloadURL, "-P", projectDownloadFolder});
-        runShellCommand(
-                new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", projectDownloadFolder});
-      } catch (Exception ex) {
-        throw new RuntimeException("Fail to download " + project + " " + version, ex);
-      }
-    }
-  }
-
-  private static void download(String project, String version, String postFix) {
-    download(project, version, postFix, project);
-  }
-
-  private static void runShellCommand(String[] commands) throws IOException, InterruptedException {
-    LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " "));
-    Process process = Runtime.getRuntime().exec(commands);
-    StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream());
-    StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream());
-    errorGobbler.start();
-    outputGobbler.start();
-    if (process.waitFor() != 0) {
-      throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " "));
-    }
-    LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " "));
-  }
-
-  private static class StreamGobbler extends Thread {
-    InputStream is;
-
-    // reads everything from is until empty.
-    StreamGobbler(InputStream is) {
-      this.is = is;
-    }
-
-    @Override
-    public void run() {
-      try {
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-        String line = null;
-        long startTime = System.currentTimeMillis();
-        while ((line = br.readLine()) != null) {
-          // logging per 5 seconds
-          if ((System.currentTimeMillis() - startTime) > 5000) {
-            LOGGER.info(line);
-            startTime = System.currentTimeMillis();
-          }
-        }
-      } catch (IOException ioe) {
-        LOGGER.warn("Fail to print shell output", ioe);
-      }
-    }
-  }
-}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index e8d08e9..7d77823 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -19,9 +19,9 @@
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.test.DownloadUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
 import org.apache.zeppelin.util.Util;
 import org.junit.jupiter.api.BeforeEach;
@@ -54,7 +54,7 @@
       System.clearProperty(confVar.getVarName());
     }
 
-    sparkHome = DownloadUtils.downloadSpark("3.4.1", "3");
+    sparkHome = DownloadUtils.downloadSpark();
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
             new File("..").getAbsolutePath());