blob: f8e7d8c59d075d87dd4cacd0d82d0a00a7b7b416 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.Properties;
/**
* This is abstract class for anything that is api incompatible between spark1 and spark2. It will
* load the correct version of SparkShims based on the version of Spark.
*/
public abstract class SparkShims {
// the following lines for checking specific versions
private static final String HADOOP_VERSION_2_6_6 = "2.6.6";
private static final String HADOOP_VERSION_2_7_0 = "2.7.0";
private static final String HADOOP_VERSION_2_7_4 = "2.7.4";
private static final String HADOOP_VERSION_2_8_0 = "2.8.0";
private static final String HADOOP_VERSION_2_8_2 = "2.8.2";
private static final String HADOOP_VERSION_2_9_0 = "2.9.0";
private static final String HADOOP_VERSION_3_0_0 = "3.0.0";
private static final String HADOOP_VERSION_3_0_0_ALPHA4 = "3.0.0-alpha4";
private static final Logger LOGGER = LoggerFactory.getLogger(SparkShims.class);
private static SparkShims sparkShims;
protected Properties properties;
public SparkShims(Properties properties) {
this.properties = properties;
}
private static SparkShims loadShims(String sparkVersion, Properties properties)
throws ReflectiveOperationException {
Class<?> sparkShimsClass;
if ("2".equals(sparkVersion)) {
LOGGER.info("Initializing shims for Spark 2.x");
sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark2Shims");
} else {
LOGGER.info("Initializing shims for Spark 1.x");
sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark1Shims");
}
Constructor c = sparkShimsClass.getConstructor(Properties.class);
return (SparkShims) c.newInstance(properties);
}
public static SparkShims getInstance(String sparkVersion, Properties properties) {
if (sparkShims == null) {
String sparkMajorVersion = getSparkMajorVersion(sparkVersion);
try {
sparkShims = loadShims(sparkMajorVersion, properties);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
return sparkShims;
}
private static String getSparkMajorVersion(String sparkVersion) {
return sparkVersion.startsWith("2") ? "2" : "1";
}
/**
* This is due to SparkListener api change between spark1 and spark2. SparkListener is trait in
* spark1 while it is abstract class in spark2.
*/
public abstract void setupSparkListener(String master, String sparkWebUrl);
protected String getNoteId(String jobgroupId) {
String[] tokens = jobgroupId.split("-");
return tokens[2];
}
protected String getParagraphId(String jobgroupId) {
String[] tokens = jobgroupId.split("-");
return tokens[3];
}
protected void buildSparkJobUrl(
String master, String sparkWebUrl, int jobId, Properties jobProperties) {
String jobGroupId = jobProperties.getProperty("spark.jobGroup.id");
String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;
String version = VersionInfo.getVersion();
if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) {
jobUrl = sparkWebUrl + "/jobs";
}
String noteId = getNoteId(jobGroupId);
String paragraphId = getParagraphId(jobGroupId);
RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
Map<String, String> infos = new java.util.HashMap<>();
infos.put("jobUrl", jobUrl);
infos.put("label", "SPARK JOB");
infos.put("tooltip", "View in Spark web UI");
if (eventClient != null) {
eventClient.onParaInfosReceived(noteId, paragraphId, infos);
}
}
/**
* This is temporal patch for support old versions of Yarn which is not adopted YARN-6615
*
* @return true if YARN-6615 is patched, false otherwise
*/
protected boolean supportYarn6615(String version) {
return (VersionUtil.compareVersions(HADOOP_VERSION_2_6_6, version) <= 0
&& VersionUtil.compareVersions(HADOOP_VERSION_2_7_0, version) > 0)
|| (VersionUtil.compareVersions(HADOOP_VERSION_2_7_4, version) <= 0
&& VersionUtil.compareVersions(HADOOP_VERSION_2_8_0, version) > 0)
|| (VersionUtil.compareVersions(HADOOP_VERSION_2_8_2, version) <= 0
&& VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) > 0)
|| (VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) <= 0
&& VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) > 0)
|| (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0_ALPHA4, version) <= 0)
|| (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) <= 0);
}
@VisibleForTesting
public static void reset() {
sparkShims = null;
}
}