blob: 37332b6c224776bc1f7e7e1b8d19cb74075db205 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.test;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import me.tongfei.progressbar.DelegatingProgressBarConsumer;
import me.tongfei.progressbar.ProgressBar;
import me.tongfei.progressbar.ProgressBarBuilder;
import me.tongfei.progressbar.ProgressBarStyle;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Optional;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/**
* Utility class for downloading spark/flink/livy. This is used for spark/flink integration test.
*/
public class DownloadUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(DownloadUtils.class);
public static final String APACHE_MIRROR_ENV_KEY = "APACHE_MIRROR";
public static final String PROGRESS_BAR_UPDATE_INTERVAL_ENV_KEY = "PROGRESS_BAR_UPDATE_INTERVAL";
private static final String MIRROR_URL;
private static final String ARCHIVE_URL = "https://archive.apache.org/dist/";
private static final int PROGRESS_BAR_UPDATE_INTERVAL;
private static String downloadFolder = System.getProperty("user.home") + "/.cache";
public static final String DEFAULT_SPARK_VERSION = "3.4.2";
public static final String DEFAULT_SPARK_HADOOP_VERSION = "3";
private DownloadUtils() {
throw new IllegalStateException("Utility class");
}
static {
try {
FileUtils.forceMkdir(new File(downloadFolder));
String envUrl = System.getenv(APACHE_MIRROR_ENV_KEY);
if (StringUtils.isNotBlank(envUrl)) {
MIRROR_URL = envUrl;
} else {
MIRROR_URL =
IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"),
StandardCharsets.UTF_8);
}
String envProgressUpdateInterval = System.getenv(PROGRESS_BAR_UPDATE_INTERVAL_ENV_KEY);
if (StringUtils.isNotBlank(envProgressUpdateInterval)) {
PROGRESS_BAR_UPDATE_INTERVAL = Integer.valueOf(envProgressUpdateInterval);
} else {
PROGRESS_BAR_UPDATE_INTERVAL = 1000;
}
} catch (IOException e) {
throw new RuntimeException("Fail to create download folder: " + downloadFolder, e);
}
}
/**
* Download Spark with default versions
*
* @return home of Spark installation
*/
public static String downloadSpark() {
return downloadSpark(DEFAULT_SPARK_VERSION, DEFAULT_SPARK_HADOOP_VERSION, null);
}
/**
* Download of a Spark distribution with a specific Hadoop version
*
* @param sparkVersion
* @param hadoopVersion
* @return home of Spark installation
*/
public static String downloadSpark(String sparkVersion, String hadoopVersion) {
return downloadSpark(sparkVersion, hadoopVersion, null);
}
/**
* Download of a Spark distribution with a Hadoop and Scala version
*
* @param sparkVersion
* @param hadoopVersion
* @param scalaVersion
* @return home of Spark installation
*/
public static String downloadSpark(String sparkVersion, String hadoopVersion,
String scalaVersion) {
File sparkFolder = new File(downloadFolder, "spark");
final File targetSparkHomeFolder;
if (StringUtils.isNotBlank(scalaVersion)) {
targetSparkHomeFolder = new File(sparkFolder,
"spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + "-scala" + scalaVersion);
} else {
targetSparkHomeFolder = new File(sparkFolder,
"spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion);
}
return downloadSpark(sparkVersion, hadoopVersion, scalaVersion, targetSparkHomeFolder);
}
/**
* Download of a Spark distribution
*
* @param sparkVersion
* @param hadoopVersion
* @param targetSparkHomeFolder - where should the spark archive be extracted
* @return home of Spark installation
*/
public static String downloadSpark(String sparkVersion, String hadoopVersion, String scalaVersion,
File targetSparkHomeFolder) {
File sparkFolder = new File(downloadFolder, "spark");
sparkFolder.mkdir();
final String sparkVersionLog;
if (StringUtils.isBlank(scalaVersion)) {
sparkVersionLog = "Spark " + sparkVersion + "-" + hadoopVersion;
} else {
sparkVersionLog = "Spark " + sparkVersion + "-" + hadoopVersion + "-" + scalaVersion;
}
if (targetSparkHomeFolder.exists()) {
LOGGER.info("Skip to download {} as it is already downloaded.", sparkVersionLog);
return targetSparkHomeFolder.getAbsolutePath();
}
final File sparkTarGZ;
if (StringUtils.isBlank(scalaVersion)) {
sparkTarGZ =
new File(sparkFolder, "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + ".tgz");
} else {
sparkTarGZ = new File(sparkFolder, "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion
+ "-scala" + scalaVersion + ".tgz");
}
try {
URL mirrorURL = new URL(MIRROR_URL +
generateSparkDownloadURL(sparkVersion, hadoopVersion, scalaVersion));
URL archiveURL = new URL(ARCHIVE_URL +
generateSparkDownloadURL(sparkVersion, hadoopVersion, scalaVersion));
LOGGER.info("Download {}", sparkVersionLog);
download(new DownloadRequest(mirrorURL, archiveURL), sparkTarGZ);
ProgressBarBuilder pbb = new ProgressBarBuilder()
.setTaskName("Unarchiv")
.setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
.setStyle(ProgressBarStyle.ASCII)
.setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL)
.setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
try (
InputStream fis = Files.newInputStream(sparkTarGZ.toPath());
InputStream pbis = ProgressBar.wrap(fis, pbb);
InputStream bis = new BufferedInputStream(pbis);
InputStream gzis = new GzipCompressorInputStream(bis);
ArchiveInputStream<TarArchiveEntry> o = new TarArchiveInputStream(gzis)) {
LOGGER.info("Unarchive {} to {}", sparkVersionLog, targetSparkHomeFolder);
unarchive(o, targetSparkHomeFolder, 1);
LOGGER.info("Unarchive {} done", sparkVersionLog);
}
} catch (IOException e) {
throw new RuntimeException("Unable to download spark", e);
}
return targetSparkHomeFolder.getAbsolutePath();
}
public static void download(String url, int retries, File dst) throws IOException {
download(new URL(url), retries, dst);
}
public static void download(DownloadRequest downloadRequest, File dst) throws IOException {
if (dst.exists()) {
LOGGER.info("Skip Download of {}, because it exists", dst);
} else {
boolean urlDownload = download(downloadRequest.getUrl(), downloadRequest.getRetries(), dst);
if (urlDownload) {
LOGGER.info("Download successfully");
return;
}
Optional<URL> alternativeURL = downloadRequest.getAlternativeUrl();
if (alternativeURL.isPresent()) {
urlDownload = download(alternativeURL.get(), downloadRequest.getRetries(), dst);
if (urlDownload) {
LOGGER.info("Download from alternative successfully");
return;
}
}
throw new IOException("Unable to download from " + downloadRequest.getUrl());
}
}
private static boolean download(URL url, int retries, File dst) {
int retry = 0;
while (retry < retries) {
try {
HttpURLConnection httpConnection = (HttpURLConnection) (url.openConnection());
long completeFileSize = httpConnection.getContentLength();
ProgressBarBuilder pbb = new ProgressBarBuilder()
.setTaskName("Download " + dst.getName())
.setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
.setStyle(ProgressBarStyle.ASCII)
.setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL)
.setInitialMax(completeFileSize)
.setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
try (
OutputStream fileOS = Files.newOutputStream(dst.toPath());
InputStream is = url.openStream();
InputStream pbis = ProgressBar.wrap(is, pbb);
InputStream bis = new BufferedInputStream(pbis)) {
IOUtils.copyLarge(bis, fileOS);
return true;
}
} catch (IOException e) {
LOGGER.info("Unable to download from {}", url, e);
++retry;
}
}
return false;
}
/**
* @param livyVersion
* @param targetLivyHomeFolder
* @return livyHome
*/
public static String downloadLivy(String livyVersion, String scalaVersion,
File targetLivyHomeFolder) {
File livyDownloadFolder = new File(downloadFolder, "livy");
livyDownloadFolder.mkdir();
final String livyLog = StringUtils.isBlank(scalaVersion) ? "Livy " + livyVersion : "Livy "
+ livyVersion + "_" + scalaVersion;
if (targetLivyHomeFolder.exists()) {
LOGGER.info("Skip to download {} as it is already downloaded.", livyLog);
return targetLivyHomeFolder.getAbsolutePath();
}
final File livyZip;
if (StringUtils.isBlank(scalaVersion)) {
// e.g. apache-livy-0.7.1-incubating-bin.zip
livyZip = new File(livyDownloadFolder, "apache-livy-" + livyVersion + "-bin.zip");
} else {
// e.g apache-livy-0.8.0-incubating_2.12-bin.zip
livyZip = new File(livyDownloadFolder, "apache-livy-" + livyVersion + "_" + scalaVersion + "-bin.zip");
}
try {
URL mirrorURL = new URL(MIRROR_URL + generateLivyDownloadUrl(livyVersion, scalaVersion));
URL archiveURL = new URL(ARCHIVE_URL + generateLivyDownloadUrl(livyVersion, scalaVersion));
LOGGER.info("Download {}", livyLog);
download(new DownloadRequest(mirrorURL, archiveURL), livyZip);
LOGGER.info("Unzip {} to {}", livyLog, targetLivyHomeFolder);
ProgressBarBuilder pbb = new ProgressBarBuilder()
.setTaskName("Unarchiv Livy")
.setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
.setStyle(ProgressBarStyle.ASCII)
.setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL)
.setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
try (InputStream fis = Files.newInputStream(livyZip.toPath());
InputStream pbis = ProgressBar.wrap(fis, pbb);
InputStream bis = new BufferedInputStream(pbis);
ZipInputStream zis = new ZipInputStream(bis)) {
unzip(zis, targetLivyHomeFolder, 1);
}
LOGGER.info("Unzip {} done", livyLog);
// Create logs directory
File logs = new File(targetLivyHomeFolder, "logs");
logs.mkdir();
} catch (MalformedURLException e) {
LOGGER.error("invalid URL", e);
} catch (IOException e) {
throw new RuntimeException("Unable to download livy", e);
}
return targetLivyHomeFolder.getAbsolutePath();
}
/**
* @param livyVersion
* @return return livyHome
* @throws IOException
*/
public static String downloadLivy(String livyVersion) {
File livyDownloadFolder = new File(downloadFolder, "livy");
File targetLivyHomeFolder = new File(livyDownloadFolder, "livy-" + livyVersion);
return downloadLivy(livyVersion, null, targetLivyHomeFolder);
}
public static String downloadLivy(String livyVersion, String scalaVersion) {
File livyDownloadFolder = new File(downloadFolder, "livy");
File targetLivyHomeFolder =
new File(livyDownloadFolder, "livy-" + livyVersion + "_" + scalaVersion);
return downloadLivy(livyVersion, scalaVersion, targetLivyHomeFolder);
}
private static File newFile(File destinationDir, ZipEntry zipEntry, int strip)
throws IOException {
String filename = zipEntry.getName();
for (int i = 0; i < strip; ++i) {
if (filename.contains(File.separator)) {
filename = filename.substring(filename.indexOf(File.separator) + 1);
}
}
File destFile = new File(destinationDir, filename);
String destDirPath = destinationDir.getCanonicalPath();
String destFilePath = destFile.getCanonicalPath();
if (!destFilePath.startsWith(destDirPath + File.separator)) {
throw new IOException("Entry is outside of the target dir: " + zipEntry.getName());
}
return destFile;
}
private static File newFile(File destDir, ArchiveEntry archiveEntry, int strip)
throws IOException {
String filename = archiveEntry.getName();
for (int i = 0; i < strip; ++i) {
if (filename.contains(File.separator)) {
filename = filename.substring(filename.indexOf(File.separator) + 1);
}
}
File destFile = new File(destDir, filename);
String destDirPath = destDir.getCanonicalPath();
String destFilePath = destFile.getCanonicalPath();
if (!destFilePath.startsWith(destDirPath + File.separator)) {
throw new IOException("Entry is outside of the target dir: " + archiveEntry.getName());
}
return destFile;
}
private static void unarchive(ArchiveInputStream<? extends ArchiveEntry> ais, File destDir,
int strip) throws IOException {
byte[] buffer = new byte[1024];
ArchiveEntry archiveEntry = ais.getNextEntry();
while (archiveEntry != null) {
File newFile;
try {
newFile = newFile(destDir, archiveEntry, strip);
} catch (IOException e) {
LOGGER.info("Skip {}", archiveEntry.getName());
archiveEntry = ais.getNextEntry();
continue;
}
if (archiveEntry.isDirectory()) {
if (!newFile.isDirectory() && !newFile.mkdirs()) {
throw new IOException("Failed to create directory " + newFile);
}
} else {
File parent = newFile.getParentFile();
if (!parent.isDirectory() && !parent.mkdirs()) {
throw new IOException("Failed to create directory " + parent);
}
// write file content
try (FileOutputStream fos = new FileOutputStream(newFile)) {
int len;
while ((len = ais.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
}
// Change permissions and metadata
if (newFile.getParentFile().getName().contains("bin")
&& !newFile.setExecutable(true, false)) {
LOGGER.info("Setting file {} to executable failed", newFile);
}
if (!newFile.setLastModified(archiveEntry.getLastModifiedDate().getTime())) {
LOGGER.info("Setting last modified date to file {} failed", newFile);
}
}
archiveEntry = ais.getNextEntry();
}
}
private static void unzip(ZipInputStream zis, File destDir, int strip) throws IOException {
byte[] buffer = new byte[1024];
ZipEntry zipEntry = zis.getNextEntry();
while (zipEntry != null) {
File newFile;
try {
newFile = newFile(destDir, zipEntry, strip);
} catch (IOException e) {
LOGGER.info("Skip {}", zipEntry.getName());
zipEntry = zis.getNextEntry();
continue;
}
if (zipEntry.isDirectory()) {
if (!newFile.isDirectory() && !newFile.mkdirs()) {
throw new IOException("Failed to create directory " + newFile);
}
} else {
File parent = newFile.getParentFile();
if (!parent.isDirectory() && !parent.mkdirs()) {
throw new IOException("Failed to create directory " + parent);
}
// write file content
try (FileOutputStream fos = new FileOutputStream(newFile)) {
int len;
while ((len = zis.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
}
// Change permissions and metadata
if (newFile.getParentFile().getName().contains("bin")
&& !newFile.setExecutable(true, false)) {
LOGGER.info("Setting file {} to executable failed", newFile);
}
if (!newFile.setLastModified(zipEntry.getLastModifiedTime().toMillis())) {
LOGGER.info("Setting last modified date to file {} failed", newFile);
}
}
zipEntry = zis.getNextEntry();
}
zis.closeEntry();
}
public static String downloadFlink(String flinkVersion, String scalaVersion) {
File flinkDownloadFolder = new File(downloadFolder, "flink");
flinkDownloadFolder.mkdir();
File targetFlinkHomeFolder = new File(flinkDownloadFolder, "flink-" + flinkVersion);
if (targetFlinkHomeFolder.exists()) {
LOGGER.info("Skip to download Flink {}_{} as it is already downloaded.", flinkVersion,
scalaVersion);
return targetFlinkHomeFolder.getAbsolutePath();
}
File flinkTGZ = new File(flinkDownloadFolder,
"flink-" + flinkVersion + "-bin-scala_" + scalaVersion + ".tgz");
try {
URL mirrorURL = new URL(MIRROR_URL + generateDownloadURL(
"flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz", "flink"));
URL archiveURL = new URL(ARCHIVE_URL + generateDownloadURL(
"flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz", "flink"));
LOGGER.info("Download Flink {}_{}", flinkVersion, scalaVersion);
download(new DownloadRequest(mirrorURL, archiveURL), flinkTGZ);
ProgressBarBuilder pbb = new ProgressBarBuilder()
.setTaskName("Unarchiv Flink")
.setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
.setStyle(ProgressBarStyle.ASCII)
.setUpdateIntervalMillis(1000)
.setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
try (
InputStream fis = Files.newInputStream(flinkTGZ.toPath());
InputStream pbis = ProgressBar.wrap(fis, pbb);
InputStream bis = new BufferedInputStream(pbis);
InputStream gzis = new GzipCompressorInputStream(bis);
ArchiveInputStream<TarArchiveEntry> o = new TarArchiveInputStream(gzis)) {
LOGGER.info("Unarchive Flink {}_{} to {}", flinkVersion, scalaVersion,
targetFlinkHomeFolder);
unarchive(o, targetFlinkHomeFolder, 1);
LOGGER.info("Unarchive Flink done");
}
} catch (IOException e) {
throw new RuntimeException("Unable to download flink", e);
}
// download other dependencies for running flink with yarn and hive
try {
download("https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_"
+ scalaVersion + "/"
+ flinkVersion + "/flink-connector-hive_" + scalaVersion + "-" + flinkVersion + ".jar",
3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-connector-hive_"
+ scalaVersion + "-" + flinkVersion + ".jar"));
download("https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_" + scalaVersion + "/"
+ flinkVersion + "/flink-hadoop-compatibility_" + scalaVersion + "-" + flinkVersion + ".jar",
3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-hadoop-compatibility_"
+ scalaVersion + "-" + flinkVersion + ".jar"));
download("https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.7/hive-exec-2.3.7.jar",
3, new File(targetFlinkHomeFolder, "lib" + File.separator + "hive-exec-2.3.4.jar"));
download(
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.6/hadoop-client-api-3.3.6.jar",
3, new File(targetFlinkHomeFolder,
"lib" + File.separator + "hadoop-client-api-3.3.6.jar"));
download(
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.6/hadoop-client-runtime-3.3.6.jar",
3, new File(targetFlinkHomeFolder,
"lib" + File.separator + "hadoop-client-runtime-3.3.6.jar"));
download("https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala_"
+ scalaVersion + "/"
+ flinkVersion + "/flink-table-api-scala_" + scalaVersion + "-" + flinkVersion + ".jar",
3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-table-api-scala_"
+ scalaVersion + "-" + flinkVersion + ".jar"));
download("https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_"
+ scalaVersion + "/"
+ flinkVersion + "/flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar",
3, new File(targetFlinkHomeFolder, "lib" + File.separator
+ "flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar"));
String jarName = "flink-table-planner_" + scalaVersion + "-" + flinkVersion + ".jar";
mvFile(
targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName,
targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName);
jarName = "flink-table-planner-loader-" + flinkVersion + ".jar";
mvFile(
targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName,
targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName);
if (SemanticVersion.of(flinkVersion).equalsOrNewerThan(SemanticVersion.of("1.16.0"))) {
jarName = "flink-sql-client-" + flinkVersion + ".jar";
mvFile(targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName,
targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName);
}
} catch (Exception e) {
throw new RuntimeException("Fail to download jar", e);
}
return targetFlinkHomeFolder.getAbsolutePath();
}
private static void mvFile(String srcPath, String dstPath) throws IOException {
Path src = Paths.get(srcPath);
Path dst = Paths.get(dstPath);
if (src.toFile().exists()) {
if (dst.toFile().exists()) {
LOGGER.warn("{} does exits - replacing", dstPath);
FileUtils.deleteQuietly(dst.toFile());
}
LOGGER.info("Copy file {} to {}", src, dst);
Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
} else {
LOGGER.warn("{} does not exits - skipping", srcPath);
}
}
public static String downloadHadoop(String version) {
File hadoopDownloadFolder = new File(downloadFolder, "hadoop");
hadoopDownloadFolder.mkdir();
File targetHadoopHomeFolder = new File(hadoopDownloadFolder, "hadoop-" + version);
if (targetHadoopHomeFolder.exists()) {
LOGGER.info("Skip to download Hadoop {} as it is already downloaded.", version);
return targetHadoopHomeFolder.getAbsolutePath();
}
File hadoopTGZ = new File(hadoopDownloadFolder, "hadoop-" + version + ".tar.gz");
try {
URL mirrorURL = new URL(MIRROR_URL + generateDownloadURL(
"hadoop", version, ".tar.gz", "hadoop/core"));
URL archiveURL = new URL(ARCHIVE_URL + generateDownloadURL(
"hadoop", version, ".tar.gz", "hadoop/core"));
LOGGER.info("Download Hadoop {}", version);
download(new DownloadRequest(mirrorURL, archiveURL), hadoopTGZ);
ProgressBarBuilder pbb = new ProgressBarBuilder()
.setTaskName("Unarchiv")
.setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit
.setStyle(ProgressBarStyle.ASCII)
.setUpdateIntervalMillis(1000)
.setConsumer(new DelegatingProgressBarConsumer(LOGGER::info));
try (
InputStream fis = Files.newInputStream(hadoopTGZ.toPath());
InputStream pbis = ProgressBar.wrap(fis, pbb);
InputStream bis = new BufferedInputStream(pbis);
InputStream gzis = new GzipCompressorInputStream(bis);
ArchiveInputStream<TarArchiveEntry> o = new TarArchiveInputStream(gzis)) {
LOGGER.info("Unarchive Hadoop {} to {}", version, targetHadoopHomeFolder);
unarchive(o, targetHadoopHomeFolder, 1);
LOGGER.info("Unarchive Hadoop {} done", version);
}
} catch (IOException e) {
throw new RuntimeException("Unable to download hadoop");
}
return targetHadoopHomeFolder.getAbsolutePath();
}
private static String generateDownloadURL(String project, String version, String postFix,
String projectPath) {
return projectPath + "/" + project + "-" + version + "/" + project + "-" + version
+ postFix;
}
private static String generateSparkDownloadURL(String sparkVersion, String hadoopVersion,
String scalaVersion) {
final String url;
String sparkVersionFolder = "spark/spark-" + sparkVersion;
if (StringUtils.isNotBlank(hadoopVersion)) {
if (StringUtils.isNotBlank(scalaVersion)) {
// spark-3.4.0-bin-hadoop3-scala2.13.tgz
url = sparkVersionFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion
+ "-scala" + scalaVersion + ".tgz";
} else {
url =
sparkVersionFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + ".tgz";
}
} else {
url = sparkVersionFolder + "/spark-" + sparkVersion + "-bin-without-hadoop.tgz";
}
return url;
}
private static String generateLivyDownloadUrl(String livyVersion, String scalaVersion) {
SemanticVersion livy = SemanticVersion.of(livyVersion.replace("incubating", ""));
if (livy.equalsOrNewerThan(SemanticVersion.of("0.8.0"))) {
return "incubator/livy/" + livyVersion + "/apache-livy-" + livyVersion + "_" + scalaVersion
+ "-bin.zip";
}
return "incubator/livy/" + livyVersion + "/apache-livy-" + livyVersion + "-bin.zip";
}
}