[ZEPPELIN-6006] Remove command line applications when downloading applications (#4746)
* Move Files with java
* Use java to download external dependecies
* Improve code after review
* Correct Mirror-URL and compilation
diff --git a/pom.xml b/pom.xml
index b9128bb..0d14167 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
<module>zeppelin-jupyter</module>
<module>zeppelin-plugins</module>
<module>zeppelin-distribution</module>
+ <module>zeppelin-test</module>
</modules>
<properties>
@@ -116,6 +117,7 @@
<!-- common library versions -->
<slf4j.version>1.7.35</slf4j.version>
<reload4j.version>1.2.25</reload4j.version>
+ <log4j2.version>2.23.1</log4j2.version>
<libthrift.version>0.13.0</libthrift.version>
<flexmark.all.version>0.62.2</flexmark.all.version>
<gson.version>2.8.9</gson.version>
diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml
index c79306e..d0fc468 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -100,6 +100,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-test</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 0d8167a..f52d063 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.zeppelin.test.DownloadUtils;
import org.apache.zeppelin.interpreter.ExecutionContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -32,9 +33,7 @@
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 579c0b1..26dee4f 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
+import org.apache.zeppelin.test.DownloadUtils;
import org.apache.zeppelin.interpreter.ExecutionContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -36,7 +37,6 @@
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.codehaus.plexus.util.xml.pull.XmlPullParserException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java
index e0af99a..0ae1bcc 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.zeppelin.test.DownloadUtils;
import org.apache.zeppelin.interpreter.ExecutionContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -33,7 +34,6 @@
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -62,11 +62,9 @@
@BeforeAll
public static void setUp() throws IOException {
- String sparkVersion = "3.4.1";
- String hadoopVersion = "3";
- LOGGER.info("Testing Spark Version: " + sparkVersion);
- LOGGER.info("Testing Hadoop Version: " + hadoopVersion);
- sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion);
+ LOGGER.info("Testing Spark Version: " + DownloadUtils.DEFAULT_SPARK_VERSION);
+ LOGGER.info("Testing Hadoop Version: " + DownloadUtils.DEFAULT_SPARK_HADOOP_VERSION);
+ sparkHome = DownloadUtils.downloadSpark();
hadoopCluster = new MiniHadoopCluster();
hadoopCluster.start();
@@ -102,7 +100,7 @@
InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
InterpreterResult interpreterResult =
sparkSubmitInterpreter.interpret("--master local --class org.apache.spark.examples.SparkPi --deploy-mode client " +
- sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context);
+ sparkHome + "/examples/jars/spark-examples_2.12-" + DownloadUtils.DEFAULT_SPARK_VERSION + ".jar", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code(), interpreterResult.toString());
// no yarn application launched
@@ -126,7 +124,7 @@
sparkSubmitInterpreter.interpret("--master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi " +
"--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " +
"--conf spark.executor.memory=512m " +
- sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context);
+ sparkHome + "/examples/jars/spark-examples_2.12-" + DownloadUtils.DEFAULT_SPARK_VERSION + ".jar", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code(), interpreterResult.toString());
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.FINISHED));
@@ -159,7 +157,7 @@
sparkSubmitInterpreter.interpret("--master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi " +
"--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " +
"--conf spark.executor.memory=512m " +
- sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context);
+ sparkHome + "/examples/jars/spark-examples_2.12-" + DownloadUtils.DEFAULT_SPARK_VERSION + ".jar", context);
assertEquals(InterpreterResult.Code.INCOMPLETE, interpreterResult.code(), interpreterResult.toString());
assertTrue(interpreterResult.toString().contains("Paragraph received a SIGTERM"), interpreterResult.toString());
} catch (InterpreterException e) {
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 6832051..a02b88e 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -18,13 +18,13 @@
package org.apache.zeppelin.integration;
import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.test.DownloadUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.ExecuteResult;
import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
import org.apache.zeppelin.client.Status;
import org.apache.zeppelin.client.ZSession;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.rest.AbstractTestRestApi;
@@ -68,7 +68,7 @@
zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000");
notebook = TestUtils.getInstance(Notebook.class);
- sparkHome = DownloadUtils.downloadSpark("3.4.1", "3");
+ sparkHome = DownloadUtils.downloadSpark();
flinkHome = DownloadUtils.downloadFlink("1.17.1", "2.12");
}
@@ -189,7 +189,7 @@
assertEquals(Status.FINISHED, result.getStatus(), result.toString());
assertEquals(1, result.getResults().size());
assertEquals("TEXT", result.getResults().get(0).getType());
- assertTrue(result.getResults().get(0).getData().contains("3.4.1"), result.getResults().get(0).getData());
+ assertTrue(result.getResults().get(0).getData().contains(DownloadUtils.DEFAULT_SPARK_VERSION), result.getResults().get(0).getData());
assertEquals(0, result.getJobUrls().size());
// pyspark
@@ -258,7 +258,7 @@
assertEquals(Status.FINISHED, result.getStatus(), result.toString());
assertEquals(1, result.getResults().size());
assertEquals("TEXT", result.getResults().get(0).getType());
- assertTrue(result.getResults().get(0).getData().contains("3.4.1"), result.getResults().get(0).getData());
+ assertTrue(result.getResults().get(0).getData().contains(DownloadUtils.DEFAULT_SPARK_VERSION), result.getResults().get(0).getData());
assertEquals(0, result.getJobUrls().size());
// pyspark
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
index c14e002..1a19bf4 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
@@ -18,8 +18,8 @@
package org.apache.zeppelin.integration;
import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.test.DownloadUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.rest.AbstractTestRestApi;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index 3366f58..7c0cb00 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.integration;
import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.test.DownloadUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.Input;
@@ -29,9 +30,7 @@
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.rest.AbstractTestRestApi;
@@ -39,9 +38,7 @@
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.utils.TestUtils;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/zeppelin-test/pom.xml b/zeppelin-test/pom.xml
new file mode 100644
index 0000000..8973ecc
--- /dev/null
+++ b/zeppelin-test/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin</artifactId>
+ <version>0.12.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>zeppelin-test</artifactId>
+ <name>Zeppelin: Test</name>
+ <description>Zeppelin test code used in other modules</description>
+ <properties>
+ <progressbar.version>0.9.5</progressbar.version>
+ </properties>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>${commons.compress.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>me.tongfei</groupId>
+ <artifactId>progressbar</artifactId>
+ <version>${progressbar.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java
new file mode 100644
index 0000000..50126ac
--- /dev/null
+++ b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.test;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+public class DownloadRequest {
+ private final URL url;
+ private final Optional<URL> alternativeUrl;
+ private final int retries;
+
+ public static final int DEFAULT_RETRIES = 3;
+
+ public DownloadRequest(String url, int retries) throws MalformedURLException {
+ this(url, null, retries);
+ }
+
+ public DownloadRequest(String url, String alternativeUrl) throws MalformedURLException {
+ this(url, alternativeUrl, DEFAULT_RETRIES);
+ }
+
+ public DownloadRequest(String url, String alternativeUrl, int retries)
+ throws MalformedURLException {
+ if (alternativeUrl != null) {
+ this.url = new URL(url);
+ this.alternativeUrl = Optional.of(new URL(alternativeUrl));
+ this.retries = retries;
+ } else {
+ this.url = new URL(url);
+ this.alternativeUrl = Optional.empty();
+ this.retries = retries;
+ }
+ }
+
+ public DownloadRequest(URL url, int retries) {
+ this(url, null, retries);
+ }
+
+ public DownloadRequest(URL url, URL alternativeUrl) {
+ this(url, alternativeUrl, DEFAULT_RETRIES);
+ }
+
+ public DownloadRequest(URL url, URL alternativeUrl, int retries) {
+ this.url = url;
+ this.alternativeUrl = Optional.of(alternativeUrl);
+ this.retries = retries;
+ }
+
+ public URL getUrl() {
+ return url;
+ }
+
+ public Optional<URL> getAlternativeUrl() {
+ return alternativeUrl;
+ }
+
+ public int getRetries() {
+ return retries;
+ }
+}
diff --git a/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java
new file mode 100644
index 0000000..37332b6
--- /dev/null
+++ b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.test;
+
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.tongfei.progressbar.DelegatingProgressBarConsumer;
+import me.tongfei.progressbar.ProgressBar;
+import me.tongfei.progressbar.ProgressBarBuilder;
+import me.tongfei.progressbar.ProgressBarStyle;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Optional;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/**
+ * Utility class for downloading spark/flink/livy. This is used for spark/flink integration test.
+ */
+public class DownloadUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DownloadUtils.class);
+
+ public static final String APACHE_MIRROR_ENV_KEY = "APACHE_MIRROR";
+ public static final String PROGRESS_BAR_UPDATE_INTERVAL_ENV_KEY = "PROGRESS_BAR_UPDATE_INTERVAL";
+
+ private static final String MIRROR_URL;
+ private static final String ARCHIVE_URL = "https://archive.apache.org/dist/";
+ private static final int PROGRESS_BAR_UPDATE_INTERVAL;
+
+ private static String downloadFolder = System.getProperty("user.home") + "/.cache";
+ public static final String DEFAULT_SPARK_VERSION = "3.4.2";
+ public static final String DEFAULT_SPARK_HADOOP_VERSION = "3";
+
+
+ private DownloadUtils() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ static {
+ try {
+ FileUtils.forceMkdir(new File(downloadFolder));
+ String envUrl = System.getenv(APACHE_MIRROR_ENV_KEY);
+ if (StringUtils.isNotBlank(envUrl)) {
+ MIRROR_URL = envUrl;
+ } else {
+ MIRROR_URL =
+ IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"),
+ StandardCharsets.UTF_8);
+ }
+ String envProgressUpdateInterval = System.getenv(PROGRESS_BAR_UPDATE_INTERVAL_ENV_KEY);
+ if (StringUtils.isNotBlank(envProgressUpdateInterval)) {
+ PROGRESS_BAR_UPDATE_INTERVAL = Integer.valueOf(envProgressUpdateInterval);
+ } else {
+ PROGRESS_BAR_UPDATE_INTERVAL = 1000;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to create download folder: " + downloadFolder, e);
+ }
+ }
+
+ /**
+ * Download Spark with default versions
+ *
+ * @return home of Spark installation
+ */
+ public static String downloadSpark() {
+ return downloadSpark(DEFAULT_SPARK_VERSION, DEFAULT_SPARK_HADOOP_VERSION, null);
+ }
+
+ /**
+ * Download of a Spark distribution with a specific Hadoop version
+ *
+ * @param sparkVersion
+ * @param hadoopVersion
+ * @return home of Spark installation
+ */
+ public static String downloadSpark(String sparkVersion, String hadoopVersion) {
+ return downloadSpark(sparkVersion, hadoopVersion, null);
+ }
+
+ /**
+ * Download of a Spark distribution with a Hadoop and Scala version
+ *
+ * @param sparkVersion
+ * @param hadoopVersion
+ * @param scalaVersion
+ * @return home of Spark installation
+ */
+ public static String downloadSpark(String sparkVersion, String hadoopVersion,
+ String scalaVersion) {
+ File sparkFolder = new File(downloadFolder, "spark");
+ final File targetSparkHomeFolder;
+ if (StringUtils.isNotBlank(scalaVersion)) {
+ targetSparkHomeFolder = new File(sparkFolder,
+ "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + "-scala" + scalaVersion);
+ } else {
+ targetSparkHomeFolder = new File(sparkFolder,
+ "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion);
+ }
+
+ return downloadSpark(sparkVersion, hadoopVersion, scalaVersion, targetSparkHomeFolder);
+ }
+
+ /**
+ * Download of a Spark distribution
+ *
+ * @param sparkVersion
+ * @param hadoopVersion
+ * @param targetSparkHomeFolder - where should the spark archive be extracted
+ * @return home of Spark installation
+ */
+ public static String downloadSpark(String sparkVersion, String hadoopVersion, String scalaVersion,
+ File targetSparkHomeFolder) {
+ File sparkFolder = new File(downloadFolder, "spark");
+ sparkFolder.mkdir();
+ final String sparkVersionLog;
+ if (StringUtils.isBlank(scalaVersion)) {
+ sparkVersionLog = "Spark " + sparkVersion + "-" + hadoopVersion;
+ } else {
+ sparkVersionLog = "Spark " + sparkVersion + "-" + hadoopVersion + "-" + scalaVersion;
+ }
+ if (targetSparkHomeFolder.exists()) {
+ LOGGER.info("Skip to download {} as it is already downloaded.", sparkVersionLog);
+ return targetSparkHomeFolder.getAbsolutePath();
+ }
+ final File sparkTarGZ;
+ if (StringUtils.isBlank(scalaVersion)) {
+ sparkTarGZ =
+ new File(sparkFolder, "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + ".tgz");
+ } else {
+ sparkTarGZ = new File(sparkFolder, "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion
+ + "-scala" + scalaVersion + ".tgz");
+ }
+
+ try {
+ URL mirrorURL = new URL(MIRROR_URL +
+ generateSparkDownloadURL(sparkVersion, hadoopVersion, scalaVersion));
+ URL archiveURL = new URL(ARCHIVE_URL +
+ generateSparkDownloadURL(sparkVersion, hadoopVersion, scalaVersion));
+ LOGGER.info("Download {}", sparkVersionLog);
+ download(new DownloadRequest(mirrorURL, archiveURL), sparkTarGZ);
+ ProgressBarBuilder pbb = new ProgressBarBuilder()
+ .setTaskName("Unarchiv")
+ .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+ .setStyle(ProgressBarStyle.ASCII)
+ .setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL)
+ .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+ try (
+ InputStream fis = Files.newInputStream(sparkTarGZ.toPath());
+ InputStream pbis = ProgressBar.wrap(fis, pbb);
+ InputStream bis = new BufferedInputStream(pbis);
+ InputStream gzis = new GzipCompressorInputStream(bis);
+ ArchiveInputStream<TarArchiveEntry> o = new TarArchiveInputStream(gzis)) {
+ LOGGER.info("Unarchive {} to {}", sparkVersionLog, targetSparkHomeFolder);
+ unarchive(o, targetSparkHomeFolder, 1);
+ LOGGER.info("Unarchive {} done", sparkVersionLog);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to download spark", e);
+ }
+ return targetSparkHomeFolder.getAbsolutePath();
+ }
+
+
+ public static void download(String url, int retries, File dst) throws IOException {
+ download(new URL(url), retries, dst);
+ }
+
+ public static void download(DownloadRequest downloadRequest, File dst) throws IOException {
+ if (dst.exists()) {
+ LOGGER.info("Skip Download of {}, because it exists", dst);
+ } else {
+ boolean urlDownload = download(downloadRequest.getUrl(), downloadRequest.getRetries(), dst);
+ if (urlDownload) {
+ LOGGER.info("Download successfully");
+ return;
+ }
+ Optional<URL> alternativeURL = downloadRequest.getAlternativeUrl();
+ if (alternativeURL.isPresent()) {
+ urlDownload = download(alternativeURL.get(), downloadRequest.getRetries(), dst);
+ if (urlDownload) {
+ LOGGER.info("Download from alternative successfully");
+ return;
+ }
+ }
+ throw new IOException("Unable to download from " + downloadRequest.getUrl());
+ }
+ }
+
+ private static boolean download(URL url, int retries, File dst) {
+ int retry = 0;
+ while (retry < retries) {
+ try {
+ HttpURLConnection httpConnection = (HttpURLConnection) (url.openConnection());
+ long completeFileSize = httpConnection.getContentLength();
+ ProgressBarBuilder pbb = new ProgressBarBuilder()
+ .setTaskName("Download " + dst.getName())
+ .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+ .setStyle(ProgressBarStyle.ASCII)
+ .setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL)
+ .setInitialMax(completeFileSize)
+ .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+ try (
+ OutputStream fileOS = Files.newOutputStream(dst.toPath());
+ InputStream is = url.openStream();
+ InputStream pbis = ProgressBar.wrap(is, pbb);
+ InputStream bis = new BufferedInputStream(pbis)) {
+ IOUtils.copyLarge(bis, fileOS);
+ return true;
+ }
+ } catch (IOException e) {
+ LOGGER.info("Unable to download from {}", url, e);
+ ++retry;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param livyVersion
+ * @param targetLivyHomeFolder
+ * @return livyHome
+ */
+ public static String downloadLivy(String livyVersion, String scalaVersion,
+ File targetLivyHomeFolder) {
+ File livyDownloadFolder = new File(downloadFolder, "livy");
+ livyDownloadFolder.mkdir();
+ final String livyLog = StringUtils.isBlank(scalaVersion) ? "Livy " + livyVersion : "Livy "
+ + livyVersion + "_" + scalaVersion;
+ if (targetLivyHomeFolder.exists()) {
+ LOGGER.info("Skip to download {} as it is already downloaded.", livyLog);
+ return targetLivyHomeFolder.getAbsolutePath();
+ }
+ final File livyZip;
+ if (StringUtils.isBlank(scalaVersion)) {
+ // e.g. apache-livy-0.7.1-incubating-bin.zip
+ livyZip = new File(livyDownloadFolder, "apache-livy-" + livyVersion + "-bin.zip");
+ } else {
+ // e.g apache-livy-0.8.0-incubating_2.12-bin.zip
+ livyZip = new File(livyDownloadFolder, "apache-livy-" + livyVersion + "_" + scalaVersion + "-bin.zip");
+ }
+
+ try {
+ URL mirrorURL = new URL(MIRROR_URL + generateLivyDownloadUrl(livyVersion, scalaVersion));
+ URL archiveURL = new URL(ARCHIVE_URL + generateLivyDownloadUrl(livyVersion, scalaVersion));
+ LOGGER.info("Download {}", livyLog);
+ download(new DownloadRequest(mirrorURL, archiveURL), livyZip);
+ LOGGER.info("Unzip {} to {}", livyLog, targetLivyHomeFolder);
+ ProgressBarBuilder pbb = new ProgressBarBuilder()
+ .setTaskName("Unarchiv Livy")
+ .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+ .setStyle(ProgressBarStyle.ASCII)
+ .setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL)
+ .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+ try (InputStream fis = Files.newInputStream(livyZip.toPath());
+ InputStream pbis = ProgressBar.wrap(fis, pbb);
+ InputStream bis = new BufferedInputStream(pbis);
+ ZipInputStream zis = new ZipInputStream(bis)) {
+ unzip(zis, targetLivyHomeFolder, 1);
+ }
+ LOGGER.info("Unzip {} done", livyLog);
+ // Create logs directory
+ File logs = new File(targetLivyHomeFolder, "logs");
+ logs.mkdir();
+ } catch (MalformedURLException e) {
+ LOGGER.error("invalid URL", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to download livy", e);
+ }
+ return targetLivyHomeFolder.getAbsolutePath();
+ }
+
+ /**
+ * @param livyVersion
+ * @return return livyHome
+ * @throws IOException
+ */
+ public static String downloadLivy(String livyVersion) {
+ File livyDownloadFolder = new File(downloadFolder, "livy");
+ File targetLivyHomeFolder = new File(livyDownloadFolder, "livy-" + livyVersion);
+ return downloadLivy(livyVersion, null, targetLivyHomeFolder);
+ }
+
+ public static String downloadLivy(String livyVersion, String scalaVersion) {
+ File livyDownloadFolder = new File(downloadFolder, "livy");
+ File targetLivyHomeFolder =
+ new File(livyDownloadFolder, "livy-" + livyVersion + "_" + scalaVersion);
+ return downloadLivy(livyVersion, scalaVersion, targetLivyHomeFolder);
+ }
+
+ private static File newFile(File destinationDir, ZipEntry zipEntry, int strip)
+ throws IOException {
+ String filename = zipEntry.getName();
+ for (int i = 0; i < strip; ++i) {
+ if (filename.contains(File.separator)) {
+ filename = filename.substring(filename.indexOf(File.separator) + 1);
+ }
+ }
+ File destFile = new File(destinationDir, filename);
+ String destDirPath = destinationDir.getCanonicalPath();
+ String destFilePath = destFile.getCanonicalPath();
+
+ if (!destFilePath.startsWith(destDirPath + File.separator)) {
+ throw new IOException("Entry is outside of the target dir: " + zipEntry.getName());
+ }
+
+ return destFile;
+ }
+
+ private static File newFile(File destDir, ArchiveEntry archiveEntry, int strip)
+ throws IOException {
+ String filename = archiveEntry.getName();
+ for (int i = 0; i < strip; ++i) {
+ if (filename.contains(File.separator)) {
+ filename = filename.substring(filename.indexOf(File.separator) + 1);
+ }
+ }
+ File destFile = new File(destDir, filename);
+ String destDirPath = destDir.getCanonicalPath();
+ String destFilePath = destFile.getCanonicalPath();
+
+ if (!destFilePath.startsWith(destDirPath + File.separator)) {
+ throw new IOException("Entry is outside of the target dir: " + archiveEntry.getName());
+ }
+
+ return destFile;
+ }
+
+ private static void unarchive(ArchiveInputStream<? extends ArchiveEntry> ais, File destDir,
+ int strip) throws IOException {
+ byte[] buffer = new byte[1024];
+ ArchiveEntry archiveEntry = ais.getNextEntry();
+ while (archiveEntry != null) {
+ File newFile;
+ try {
+ newFile = newFile(destDir, archiveEntry, strip);
+ } catch (IOException e) {
+ LOGGER.info("Skip {}", archiveEntry.getName());
+ archiveEntry = ais.getNextEntry();
+ continue;
+ }
+ if (archiveEntry.isDirectory()) {
+ if (!newFile.isDirectory() && !newFile.mkdirs()) {
+ throw new IOException("Failed to create directory " + newFile);
+ }
+ } else {
+ File parent = newFile.getParentFile();
+ if (!parent.isDirectory() && !parent.mkdirs()) {
+ throw new IOException("Failed to create directory " + parent);
+ }
+
+ // write file content
+ try (FileOutputStream fos = new FileOutputStream(newFile)) {
+ int len;
+ while ((len = ais.read(buffer)) > 0) {
+ fos.write(buffer, 0, len);
+ }
+ }
+ // Change permissions and metadata
+ if (newFile.getParentFile().getName().contains("bin")
+ && !newFile.setExecutable(true, false)) {
+ LOGGER.info("Setting file {} to executable failed", newFile);
+ }
+ if (!newFile.setLastModified(archiveEntry.getLastModifiedDate().getTime())) {
+ LOGGER.info("Setting last modified date to file {} failed", newFile);
+ }
+ }
+ archiveEntry = ais.getNextEntry();
+ }
+ }
+
+ private static void unzip(ZipInputStream zis, File destDir, int strip) throws IOException {
+ byte[] buffer = new byte[1024];
+ ZipEntry zipEntry = zis.getNextEntry();
+ while (zipEntry != null) {
+ File newFile;
+ try {
+ newFile = newFile(destDir, zipEntry, strip);
+ } catch (IOException e) {
+ LOGGER.info("Skip {}", zipEntry.getName());
+ zipEntry = zis.getNextEntry();
+ continue;
+ }
+ if (zipEntry.isDirectory()) {
+ if (!newFile.isDirectory() && !newFile.mkdirs()) {
+ throw new IOException("Failed to create directory " + newFile);
+ }
+ } else {
+ File parent = newFile.getParentFile();
+ if (!parent.isDirectory() && !parent.mkdirs()) {
+ throw new IOException("Failed to create directory " + parent);
+ }
+
+ // write file content
+ try (FileOutputStream fos = new FileOutputStream(newFile)) {
+ int len;
+ while ((len = zis.read(buffer)) > 0) {
+ fos.write(buffer, 0, len);
+ }
+ }
+ // Change permissions and metadata
+ if (newFile.getParentFile().getName().contains("bin")
+ && !newFile.setExecutable(true, false)) {
+ LOGGER.info("Setting file {} to executable failed", newFile);
+ }
+ if (!newFile.setLastModified(zipEntry.getLastModifiedTime().toMillis())) {
+ LOGGER.info("Setting last modified date to file {} failed", newFile);
+ }
+ }
+ zipEntry = zis.getNextEntry();
+ }
+ zis.closeEntry();
+ }
+
+ public static String downloadFlink(String flinkVersion, String scalaVersion) {
+ File flinkDownloadFolder = new File(downloadFolder, "flink");
+ flinkDownloadFolder.mkdir();
+ File targetFlinkHomeFolder = new File(flinkDownloadFolder, "flink-" + flinkVersion);
+ if (targetFlinkHomeFolder.exists()) {
+ LOGGER.info("Skip to download Flink {}_{} as it is already downloaded.", flinkVersion,
+ scalaVersion);
+ return targetFlinkHomeFolder.getAbsolutePath();
+ }
+ File flinkTGZ = new File(flinkDownloadFolder,
+ "flink-" + flinkVersion + "-bin-scala_" + scalaVersion + ".tgz");
+ try {
+ URL mirrorURL = new URL(MIRROR_URL + generateDownloadURL(
+ "flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz", "flink"));
+ URL archiveURL = new URL(ARCHIVE_URL + generateDownloadURL(
+ "flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz", "flink"));
+ LOGGER.info("Download Flink {}_{}", flinkVersion, scalaVersion);
+ download(new DownloadRequest(mirrorURL, archiveURL), flinkTGZ);
+ ProgressBarBuilder pbb = new ProgressBarBuilder()
+ .setTaskName("Unarchiv Flink")
+ .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+ .setStyle(ProgressBarStyle.ASCII)
+ .setUpdateIntervalMillis(1000)
+ .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+ try (
+ InputStream fis = Files.newInputStream(flinkTGZ.toPath());
+ InputStream pbis = ProgressBar.wrap(fis, pbb);
+ InputStream bis = new BufferedInputStream(pbis);
+ InputStream gzis = new GzipCompressorInputStream(bis);
+ ArchiveInputStream<TarArchiveEntry> o = new TarArchiveInputStream(gzis)) {
+ LOGGER.info("Unarchive Flink {}_{} to {}", flinkVersion, scalaVersion,
+ targetFlinkHomeFolder);
+ unarchive(o, targetFlinkHomeFolder, 1);
+ LOGGER.info("Unarchive Flink done");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to download flink", e);
+ }
+
+
+ // download other dependencies for running flink with yarn and hive
+ try {
+ download("https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_"
+ + scalaVersion + "/"
+ + flinkVersion + "/flink-connector-hive_" + scalaVersion + "-" + flinkVersion + ".jar",
+ 3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-connector-hive_"
+ + scalaVersion + "-" + flinkVersion + ".jar"));
+ download("https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_" + scalaVersion + "/"
+ + flinkVersion + "/flink-hadoop-compatibility_" + scalaVersion + "-" + flinkVersion + ".jar",
+ 3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-hadoop-compatibility_"
+ + scalaVersion + "-" + flinkVersion + ".jar"));
+ download("https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.7/hive-exec-2.3.7.jar",
+ 3, new File(targetFlinkHomeFolder, "lib" + File.separator + "hive-exec-2.3.4.jar"));
+ download(
+ "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.6/hadoop-client-api-3.3.6.jar",
+ 3, new File(targetFlinkHomeFolder,
+ "lib" + File.separator + "hadoop-client-api-3.3.6.jar"));
+ download(
+ "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.6/hadoop-client-runtime-3.3.6.jar",
+ 3, new File(targetFlinkHomeFolder,
+ "lib" + File.separator + "hadoop-client-runtime-3.3.6.jar"));
+ download("https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala_"
+ + scalaVersion + "/"
+ + flinkVersion + "/flink-table-api-scala_" + scalaVersion + "-" + flinkVersion + ".jar",
+ 3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-table-api-scala_"
+ + scalaVersion + "-" + flinkVersion + ".jar"));
+ download("https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_"
+ + scalaVersion + "/"
+ + flinkVersion + "/flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar",
+ 3, new File(targetFlinkHomeFolder, "lib" + File.separator
+ + "flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar"));
+
+ String jarName = "flink-table-planner_" + scalaVersion + "-" + flinkVersion + ".jar";
+ mvFile(
+ targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName,
+ targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName);
+ jarName = "flink-table-planner-loader-" + flinkVersion + ".jar";
+ mvFile(
+ targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName,
+ targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName);
+ if (SemanticVersion.of(flinkVersion).equalsOrNewerThan(SemanticVersion.of("1.16.0"))) {
+ jarName = "flink-sql-client-" + flinkVersion + ".jar";
+ mvFile(targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName,
+ targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to download jar", e);
+ }
+ return targetFlinkHomeFolder.getAbsolutePath();
+ }
+
+ private static void mvFile(String srcPath, String dstPath) throws IOException {
+ Path src = Paths.get(srcPath);
+ Path dst = Paths.get(dstPath);
+ if (src.toFile().exists()) {
+ if (dst.toFile().exists()) {
+ LOGGER.warn("{} does exits - replacing", dstPath);
+ FileUtils.deleteQuietly(dst.toFile());
+ }
+ LOGGER.info("Copy file {} to {}", src, dst);
+ Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
+ } else {
+ LOGGER.warn("{} does not exits - skipping", srcPath);
+ }
+ }
+
+ public static String downloadHadoop(String version) {
+ File hadoopDownloadFolder = new File(downloadFolder, "hadoop");
+ hadoopDownloadFolder.mkdir();
+ File targetHadoopHomeFolder = new File(hadoopDownloadFolder, "hadoop-" + version);
+ if (targetHadoopHomeFolder.exists()) {
+ LOGGER.info("Skip to download Hadoop {} as it is already downloaded.", version);
+ return targetHadoopHomeFolder.getAbsolutePath();
+ }
+ File hadoopTGZ = new File(hadoopDownloadFolder, "hadoop-" + version + ".tar.gz");
+ try {
+ URL mirrorURL = new URL(MIRROR_URL + generateDownloadURL(
+ "hadoop", version, ".tar.gz", "hadoop/core"));
+ URL archiveURL = new URL(ARCHIVE_URL + generateDownloadURL(
+ "hadoop", version, ".tar.gz", "hadoop/core"));
+ LOGGER.info("Download Hadoop {}", version);
+ download(new DownloadRequest(mirrorURL, archiveURL), hadoopTGZ);
+ ProgressBarBuilder pbb = new ProgressBarBuilder()
+ .setTaskName("Unarchiv")
+ .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
+ .setStyle(ProgressBarStyle.ASCII)
+ .setUpdateIntervalMillis(1000)
+ .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
+ try (
+ InputStream fis = Files.newInputStream(hadoopTGZ.toPath());
+ InputStream pbis = ProgressBar.wrap(fis, pbb);
+ InputStream bis = new BufferedInputStream(pbis);
+ InputStream gzis = new GzipCompressorInputStream(bis);
+ ArchiveInputStream<TarArchiveEntry> o = new TarArchiveInputStream(gzis)) {
+ LOGGER.info("Unarchive Hadoop {} to {}", version, targetHadoopHomeFolder);
+ unarchive(o, targetHadoopHomeFolder, 1);
+ LOGGER.info("Unarchive Hadoop {} done", version);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to download hadoop");
+ }
+ return targetHadoopHomeFolder.getAbsolutePath();
+ }
+
+ private static String generateDownloadURL(String project, String version, String postFix,
+ String projectPath) {
+ return projectPath + "/" + project + "-" + version + "/" + project + "-" + version
+ + postFix;
+ }
+
+ private static String generateSparkDownloadURL(String sparkVersion, String hadoopVersion,
+ String scalaVersion) {
+ final String url;
+ String sparkVersionFolder = "spark/spark-" + sparkVersion;
+ if (StringUtils.isNotBlank(hadoopVersion)) {
+ if (StringUtils.isNotBlank(scalaVersion)) {
+ // spark-3.4.0-bin-hadoop3-scala2.13.tgz
+ url = sparkVersionFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion
+ + "-scala" + scalaVersion + ".tgz";
+ } else {
+ url =
+ sparkVersionFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + ".tgz";
+ }
+ } else {
+ url = sparkVersionFolder + "/spark-" + sparkVersion + "-bin-without-hadoop.tgz";
+ }
+ return url;
+ }
+
+ private static String generateLivyDownloadUrl(String livyVersion, String scalaVersion) {
+ SemanticVersion livy = SemanticVersion.of(livyVersion.replace("incubating", ""));
+ if (livy.equalsOrNewerThan(SemanticVersion.of("0.8.0"))) {
+ return "incubator/livy/" + livyVersion + "/apache-livy-" + livyVersion + "_" + scalaVersion
+ + "-bin.zip";
+ }
+ return "incubator/livy/" + livyVersion + "/apache-livy-" + livyVersion + "-bin.zip";
+ }
+}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java b/zeppelin-test/src/main/java/org/apache/zeppelin/test/SemanticVersion.java
similarity index 97%
rename from zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java
rename to zeppelin-test/src/main/java/org/apache/zeppelin/test/SemanticVersion.java
index f9bd771..f25e503 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java
+++ b/zeppelin-test/src/main/java/org/apache/zeppelin/test/SemanticVersion.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.zeppelin.interpreter.integration;
+package org.apache.zeppelin.test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java b/zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java
new file mode 100644
index 0000000..6bed716
--- /dev/null
+++ b/zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+
+@Disabled("Takes a long time and depends on external factors.")
+class DownloadUtilsTest {
+
+ @Test
+ void downloadHadoop() {
+ String hadoopHome = DownloadUtils.downloadHadoop("3.4.0");
+ Path hadoopHomePath = Paths.get(hadoopHome);
+ assertTrue(hadoopHomePath.toFile().exists());
+ assertTrue(hadoopHomePath.toFile().isDirectory());
+ }
+
+ @Test
+ void downloadSpark() {
+ String sparkHome = DownloadUtils.downloadSpark();
+ Path sparkHomePath = Paths.get(sparkHome);
+ assertTrue(sparkHomePath.toFile().exists());
+ assertTrue(sparkHomePath.toFile().isDirectory());
+ }
+
+ @Test
+ void downloadSparkWithScala() {
+ String sparkHome = DownloadUtils.downloadSpark(DownloadUtils.DEFAULT_SPARK_VERSION, DownloadUtils.DEFAULT_SPARK_HADOOP_VERSION, "2.13");
+ Path sparkHomePath = Paths.get(sparkHome);
+ assertTrue(sparkHomePath.toFile().exists());
+ assertTrue(sparkHomePath.toFile().isDirectory());
+ }
+
+ @Test
+ void downloadFlink() {
+ String sparkHome = DownloadUtils.downloadFlink("1.16.3", "2.12");
+ Path sparkHomePath = Paths.get(sparkHome);
+ assertTrue(sparkHomePath.toFile().exists());
+ assertTrue(sparkHomePath.toFile().isDirectory());
+ }
+
+ @Test
+ void downloadLivy() {
+ String sparkHome = DownloadUtils.downloadLivy("0.7.1-incubating");
+ Path sparkHomePath = Paths.get(sparkHome);
+ assertTrue(sparkHomePath.toFile().exists());
+ assertTrue(sparkHomePath.toFile().isDirectory());
+ }
+
+ @Test
+ void downloadLivy080() {
+ String sparkHome = DownloadUtils.downloadLivy("0.8.0-incubating", "2.12");
+ Path sparkHomePath = Paths.get(sparkHome);
+ assertTrue(sparkHomePath.toFile().exists());
+ assertTrue(sparkHomePath.toFile().isDirectory());
+ }
+
+}
diff --git a/zeppelin-test/src/test/resources/log4j2.properties b/zeppelin-test/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..4935e68
--- /dev/null
+++ b/zeppelin-test/src/test/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger=debug, STDOUT
+
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %5p [%d] ({%t} %F[%M]:%L) - %m%n
+
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index cabe644..f3def93 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -268,6 +268,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-test</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
deleted file mode 100644
index 6310c1d..0000000
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.integration;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-
-/**
- * Utility class for downloading spark/flink. This is used for spark/flink integration test.
- */
-public class DownloadUtils {
- private static Logger LOGGER = LoggerFactory.getLogger(DownloadUtils.class);
-
- private static String downloadFolder = System.getProperty("user.home") + "/.cache";
-
- static {
- try {
- FileUtils.forceMkdir(new File(downloadFolder));
- } catch (IOException e) {
- throw new RuntimeException("Fail to create download folder: " + downloadFolder, e);
- }
- }
-
- public static String downloadSpark(String sparkVersion, String hadoopVersion) {
- String sparkDownloadFolder = downloadFolder + "/spark";
- File targetSparkHomeFolder =
- new File(sparkDownloadFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion);
- if (targetSparkHomeFolder.exists()) {
- LOGGER.info("Skip to download spark as it is already downloaded.");
- return targetSparkHomeFolder.getAbsolutePath();
- }
- download("spark", sparkVersion, "-bin-hadoop" + hadoopVersion + ".tgz");
- return targetSparkHomeFolder.getAbsolutePath();
- }
-
- public static String downloadFlink(String flinkVersion, String scalaVersion) {
- String flinkDownloadFolder = downloadFolder + "/flink";
- File targetFlinkHomeFolder = new File(flinkDownloadFolder + "/flink-" + flinkVersion);
- if (targetFlinkHomeFolder.exists()) {
- LOGGER.info("Skip to download flink as it is already downloaded.");
- return targetFlinkHomeFolder.getAbsolutePath();
- }
- download("flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz");
- // download other dependencies for running flink with yarn and hive
- try {
- runShellCommand(new String[]{"wget",
- "https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_" + scalaVersion + "/"
- + flinkVersion + "/flink-connector-hive_" + scalaVersion + "-" + flinkVersion + ".jar",
- "-P", targetFlinkHomeFolder + "/lib"});
- runShellCommand(new String[]{"wget",
- "https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_" + scalaVersion + "/"
- + flinkVersion + "/flink-hadoop-compatibility_" + scalaVersion + "-" + flinkVersion + ".jar",
- "-P", targetFlinkHomeFolder + "/lib"});
- runShellCommand(new String[]{"wget",
- "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.7/hive-exec-2.3.7.jar",
- "-P", targetFlinkHomeFolder + "/lib"});
- runShellCommand(new String[]{"wget",
- "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.6/hadoop-client-api-3.3.6.jar",
- "-P", targetFlinkHomeFolder + "/lib"});
- runShellCommand(new String[]{"wget",
- "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.6/hadoop-client-runtime-3.3.6.jar",
- "-P", targetFlinkHomeFolder + "/lib"});
- runShellCommand(new String[]{"wget",
- "https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala_" + scalaVersion + "/"
- + flinkVersion + "/flink-table-api-scala_" + scalaVersion + "-" + flinkVersion + ".jar",
- "-P", targetFlinkHomeFolder + "/lib"});
- runShellCommand(new String[]{"wget",
- "https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_" + scalaVersion + "/"
- + flinkVersion + "/flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar",
- "-P", targetFlinkHomeFolder + "/lib"});
- runShellCommand(new String[]{"mv",
- targetFlinkHomeFolder + "/opt/" + "flink-table-planner_" + scalaVersion + "-" + flinkVersion + ".jar",
- targetFlinkHomeFolder + "/lib"});
- runShellCommand(new String[]{"mv",
- targetFlinkHomeFolder + "/lib/" + "flink-table-planner-loader-" + flinkVersion + ".jar",
- targetFlinkHomeFolder + "/opt"});
- if (SemanticVersion.of(flinkVersion).equalsOrNewerThan(SemanticVersion.of("1.16.0"))) {
- runShellCommand(new String[]{"mv",
- targetFlinkHomeFolder + "/opt/" + "flink-sql-client-" + flinkVersion + ".jar",
- targetFlinkHomeFolder + "/lib"});
- }
- } catch (Exception e) {
- throw new RuntimeException("Fail to download jar", e);
- }
- return targetFlinkHomeFolder.getAbsolutePath();
- }
-
- public static String downloadHadoop(String version) {
- String hadoopDownloadFolder = downloadFolder + "/hadoop";
- File targetHadoopHomeFolder = new File(hadoopDownloadFolder + "/hadoop-" + version);
- if (targetHadoopHomeFolder.exists()) {
- LOGGER.info("Skip to download hadoop as it is already downloaded.");
- return targetHadoopHomeFolder.getAbsolutePath();
- }
- download("hadoop", version, ".tar.gz", "hadoop/core");
- return targetHadoopHomeFolder.getAbsolutePath();
- }
-
- // Try mirrors first, if fails fallback to apache archive
- private static void download(String project, String version, String postFix, String projectPath) {
- String projectDownloadFolder = downloadFolder + "/" + project;
- try {
- String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"), StandardCharsets.UTF_8);
- File downloadFile = new File(projectDownloadFolder + "/" + project + "-" + version + postFix);
- String downloadURL = preferredMirror + "/" + projectPath + "/" + project + "-" + version + "/" + project + "-" + version + postFix;
- runShellCommand(new String[]{"wget", downloadURL, "-P", projectDownloadFolder});
- runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", projectDownloadFolder});
- } catch (Exception e) {
- LOGGER.warn("Failed to download " + project + " from mirror site, fallback to use apache archive", e);
- File downloadFile = new File(projectDownloadFolder + "/" + project + "-" + version + postFix);
- String downloadURL =
- "https://archive.apache.org/dist/" + projectPath + "/" + project +"-"
- + version
- + "/" + project + "-"
- + version
- + postFix;
- try {
- runShellCommand(new String[]{"wget", downloadURL, "-P", projectDownloadFolder});
- runShellCommand(
- new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", projectDownloadFolder});
- } catch (Exception ex) {
- throw new RuntimeException("Fail to download " + project + " " + version, ex);
- }
- }
- }
-
- private static void download(String project, String version, String postFix) {
- download(project, version, postFix, project);
- }
-
- private static void runShellCommand(String[] commands) throws IOException, InterruptedException {
- LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " "));
- Process process = Runtime.getRuntime().exec(commands);
- StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream());
- StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream());
- errorGobbler.start();
- outputGobbler.start();
- if (process.waitFor() != 0) {
- throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " "));
- }
- LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " "));
- }
-
- private static class StreamGobbler extends Thread {
- InputStream is;
-
- // reads everything from is until empty.
- StreamGobbler(InputStream is) {
- this.is = is;
- }
-
- @Override
- public void run() {
- try {
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
- String line = null;
- long startTime = System.currentTimeMillis();
- while ((line = br.readLine()) != null) {
- // logging per 5 seconds
- if ((System.currentTimeMillis() - startTime) > 5000) {
- LOGGER.info(line);
- startTime = System.currentTimeMillis();
- }
- }
- } catch (IOException ioe) {
- LOGGER.warn("Fail to print shell output", ioe);
- }
- }
- }
-}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index e8d08e9..7d77823 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -19,9 +19,9 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.test.DownloadUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
import org.apache.zeppelin.util.Util;
import org.junit.jupiter.api.BeforeEach;
@@ -54,7 +54,7 @@
System.clearProperty(confVar.getVarName());
}
- sparkHome = DownloadUtils.downloadSpark("3.4.1", "3");
+ sparkHome = DownloadUtils.downloadSpark();
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
new File("..").getAbsolutePath());