blob: 53955dbd317486ddd939259e6cfe2717380033d6 [file] [log] [blame]
package org.apache.zeppelin.interpreter;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class for downloading spark. This is used for spark integration test.
*
*/
public class SparkDownloadUtils {
private static Logger LOGGER = LoggerFactory.getLogger(SparkDownloadUtils.class);
private static String downloadFolder = System.getProperty("user.home") + "/.cache/spark";
static {
try {
FileUtils.forceMkdir(new File(downloadFolder));
} catch (IOException e) {
throw new RuntimeException("Fail to create downloadFolder: " + downloadFolder, e);
}
}
public static String downloadSpark(String version) {
File targetSparkHomeFolder = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6");
if (targetSparkHomeFolder.exists()) {
LOGGER.info("Skip to download spark as it is already downloaded.");
return targetSparkHomeFolder.getAbsolutePath();
}
File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz");
String downloadURL =
"https://archive.apache.org/dist/spark/spark-"
+ version
+ "/spark-"
+ version
+ "-bin-hadoop2.6.tgz";
try {
runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
runShellCommand(
new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder});
} catch (Exception e) {
throw new RuntimeException("Fail to download spark " + version, e);
}
return targetSparkHomeFolder.getAbsolutePath();
}
public static String downloadFlink(String version) {
File targetFlinkHomeFolder = new File(downloadFolder + "/flink-" + version);
if (targetFlinkHomeFolder.exists()) {
LOGGER.info("Skip to download flink as it is already downloaded.");
return targetFlinkHomeFolder.getAbsolutePath();
}
File downloadFile = new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz");
String downloadURL =
"https://archive.apache.org/dist/flink/flink-"
+ version
+ "/flink-"
+ version
+ "-bin-hadoop27-scala_2.11.tgz";
try {
runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
runShellCommand(
new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder});
} catch (Exception e) {
throw new RuntimeException("Fail to download flink " + version, e);
}
return targetFlinkHomeFolder.getAbsolutePath();
}
private static void runShellCommand(String[] commands) throws IOException, InterruptedException {
LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " "));
Process process = Runtime.getRuntime().exec(commands);
StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream());
StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream());
errorGobbler.start();
outputGobbler.start();
if (process.waitFor() != 0) {
throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " "));
}
LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " "));
}
private static class StreamGobbler extends Thread {
InputStream is;
// reads everything from is until empty.
StreamGobbler(InputStream is) {
this.is = is;
}
public void run() {
try {
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
String line = null;
long startTime = System.currentTimeMillis();
while ( (line = br.readLine()) != null) {
// logging per 5 seconds
if ((System.currentTimeMillis() - startTime) > 5000) {
LOGGER.info(line);
startTime = System.currentTimeMillis();
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
}