| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.livy.utils |
| |
| import java.io.{File, IOException} |
| |
| import scala.collection.SortedMap |
| import scala.math.Ordering.Implicits._ |
| |
| import org.apache.livy.{LivyConf, Logging} |
| import org.apache.livy.LivyConf.LIVY_SPARK_SCALA_VERSION |
| |
| object LivySparkUtils extends Logging { |
| |
| // For each Spark version we supported, we need to add this mapping relation in case Scala |
| // version cannot be detected from "spark-submit --version". |
| private val _defaultSparkScalaVersion = SortedMap( |
| // Spark 2.4 + Scala 2.11 |
| (2, 4) -> "2.11", |
| // Spark 2.3 + Scala 2.11 |
| (2, 3) -> "2.11", |
| // Spark 2.2 + Scala 2.11 |
| (2, 2) -> "2.11" |
| ) |
| |
| // Supported Spark version |
| private val MIN_VERSION = (2, 2) |
| private val MAX_VERSION = (3, 0) |
| |
| private val sparkVersionRegex = """version (.*)""".r.unanchored |
| private val scalaVersionRegex = """Scala version (.*), Java""".r.unanchored |
| |
| /** |
| * Test that Spark home is configured and configured Spark home is a directory. |
| */ |
| def testSparkHome(livyConf: LivyConf): Unit = { |
| val sparkHome = livyConf.sparkHome().getOrElse { |
| throw new IllegalArgumentException("Livy requires the SPARK_HOME environment variable") |
| } |
| |
| require(new File(sparkHome).isDirectory(), "SPARK_HOME path does not exist") |
| } |
| |
| /** |
| * Test that the configured `spark-submit` executable exists. |
| * |
| * @param livyConf |
| */ |
| def testSparkSubmit(livyConf: LivyConf): Unit = { |
| try { |
| testSparkVersion(sparkSubmitVersion(livyConf)._1) |
| } catch { |
| case e: IOException => |
| throw new IOException("Failed to run spark-submit executable", e) |
| } |
| } |
| |
| /** |
| * Throw an exception if Spark version is not supported. |
| * @param version Spark version |
| */ |
| def testSparkVersion(version: String): Unit = { |
| val v = formatSparkVersion(version) |
| require(v >= MIN_VERSION, s"Unsupported Spark version $v") |
| if (v >= MAX_VERSION) { |
| warn(s"Current Spark $v is not verified in Livy, please use it carefully") |
| } |
| } |
| |
| /** |
| * Call `spark-submit --version` and parse its output for Spark and Scala version. |
| * |
| * @param livyConf |
| * @return Tuple with Spark and Scala version |
| */ |
| def sparkSubmitVersion(livyConf: LivyConf): (String, Option[String]) = { |
| val sparkSubmit = livyConf.sparkSubmit() |
| val pb = new ProcessBuilder(sparkSubmit, "--version") |
| pb.redirectErrorStream(true) |
| pb.redirectInput(ProcessBuilder.Redirect.PIPE) |
| |
| if (LivyConf.TEST_MODE) { |
| pb.environment().put("LIVY_TEST_CLASSPATH", sys.props("java.class.path")) |
| } |
| |
| val process = new LineBufferedProcess(pb.start(), 200) |
| val exitCode = process.waitFor() |
| val output = process.inputIterator.mkString("\n") |
| |
| var sparkVersion = "" |
| output match { |
| case sparkVersionRegex(version) => sparkVersion = version |
| case _ => |
| throw new IOException(f"Unable to determine spark-submit version [$exitCode]:\n$output") |
| } |
| |
| val scalaVersion = output match { |
| case scalaVersionRegex(version) if version.nonEmpty => Some(formatScalaVersion(version)) |
| case _ => None |
| } |
| |
| (sparkVersion, scalaVersion) |
| } |
| |
| def sparkScalaVersion( |
| formattedSparkVersion: (Int, Int), |
| scalaVersionFromSparkSubmit: Option[String], |
| livyConf: LivyConf): String = { |
| val scalaVersionInLivyConf = Option(livyConf.get(LIVY_SPARK_SCALA_VERSION)) |
| .filter(_.nonEmpty) |
| .map(formatScalaVersion) |
| |
| for (vSparkSubmit <- scalaVersionFromSparkSubmit; vLivyConf <- scalaVersionInLivyConf) { |
| require(vSparkSubmit == vLivyConf, |
| s"Scala version detected from spark-submit ($vSparkSubmit) does not match " + |
| s"Scala version configured in livy.conf ($vLivyConf)") |
| } |
| |
| scalaVersionInLivyConf |
| .orElse(scalaVersionFromSparkSubmit) |
| .getOrElse(defaultSparkScalaVersion(formattedSparkVersion)) |
| } |
| |
| /** |
| * Return formatted Spark version. |
| * |
| * @param version Spark version |
| * @return Two element tuple, one is major version and the other is minor version |
| */ |
| def formatSparkVersion(version: String): (Int, Int) = { |
| val versionPattern = """^(\d+)\.(\d+)(\..*)?$""".r |
| versionPattern.findFirstMatchIn(version) match { |
| case Some(m) => |
| (m.group(1).toInt, m.group(2).toInt) |
| case None => |
| throw new IllegalArgumentException(s"Fail to parse Spark version from $version") |
| } |
| } |
| |
| /** |
| * Return Scala binary version. |
| * It strips the patch version if specified. |
| * Throws if it cannot parse the version. |
| * |
| * @param scalaVersion Scala binary version String |
| * @return Scala binary version |
| */ |
| def formatScalaVersion(scalaVersion: String): String = { |
| val versionPattern = """(\d)+\.(\d+)+.*""".r |
| scalaVersion match { |
| case versionPattern(major, minor) => s"$major.$minor" |
| case _ => throw new IllegalArgumentException(s"Unrecognized Scala version: $scalaVersion") |
| } |
| } |
| |
| /** |
| * Return the default Scala version of a Spark version. |
| * |
| * @param sparkVersion formatted Spark version. |
| * @return Scala binary version |
| */ |
| private[utils] def defaultSparkScalaVersion(sparkVersion: (Int, Int)): String = { |
| _defaultSparkScalaVersion.get(sparkVersion) |
| .orElse { |
| if (sparkVersion < _defaultSparkScalaVersion.head._1) { |
| throw new IllegalArgumentException(s"Spark version $sparkVersion is less than the " + |
| s"minimum version ${_defaultSparkScalaVersion.head._1} supported by Livy") |
| } else if (sparkVersion > _defaultSparkScalaVersion.last._1) { |
| val (spark, scala) = _defaultSparkScalaVersion.last |
| warn(s"Spark version $sparkVersion is greater then the maximum version " + |
| s"$spark supported by Livy, will choose Scala version $scala instead, " + |
| s"please specify manually if it is the expected Scala version you want") |
| Some(scala) |
| } else { |
| None |
| } |
| } |
| .getOrElse( |
| throw new IllegalArgumentException(s"Fail to get Scala version from Spark $sparkVersion")) |
| } |
| } |