This article mainly introduces the installation, use and configuration of the Spark
engine plugin in Linkis
.
If you wish to use the spark
engine on your server, you need to ensure that the following environment variables are set correctly and that the engine's starting user has these environment variables.
It is strongly recommended that you check these environment variables for the executing user before executing a spark
job.
| Environment variable name | Environment variable content | Remarks | |-----------------|----------------|-------------- -----------------------------| | JAVA_HOME | JDK installation path | Required | | HADOOP_HOME | Hadoop installation path | Required | | HADOOP_CONF_DIR | Hadoop configuration path | required | | HIVE_CONF_DIR | Hive configuration path | required | | SPARK_HOME | Spark installation path | Required | | SPARK_CONF_DIR | Spark configuration path | Required | | python | python | It is recommended to use anaconda's python as the default python |
Verify that Spark
is successfully installed by pyspark
pyspark #After entering the pyspark virtual environment, the spark logo appears, indicating that the environment is successfully installed Welcome to ______ /__/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.2.1 /_/ Using Python version 2.7.13 (default, Sep 30 2017 18:12:43) SparkSession available as 'spark'.
The Spark
engine plugin is included in the binary installation package released by linkis
by default, and users do not need to install it additionally.
In theory Linkis
supports all versions of spark2.x
and above. The default supported version is Spark3.2.1
. If you want to use another version of spark
, such as spark2.1.0
, you just need to modify the version of the plugin spark
and compile it. Specifically, you can find the linkis-engineplugin-spark
module, change the value of the <spark.version>
tag in the maven
dependency to 2.1.0, and then compile this module separately.
EngineConnPlugin engine plugin installation
spark
engineLinkis-cli
# codeType correspondence py-->pyspark sql-->sparkSQL scala-->Spark scala sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -code "show databases" -submitUser hadoop -proxyUser hadoop # You can specify the yarn queue in the submission parameter by -confMap wds.linkis.yarnqueue=dws sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -confMap wds.linkis.yarnqueue=dws -code "show databases" -submitUser hadoop -proxyUser hadoop
More Linkis-Cli
command parameter reference: Linkis-Cli usage
Linkis SDK
Linkis
provides SDK
of Java
and Scala
to submit tasks to Linkis
server. For details, please refer to JAVA SDK Manual. For Spark
tasks you only need to modify the EngineConnType
and CodeType
parameters in Demo
:
Map<String, Object> labels = new HashMap<String, Object>(); labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-3.2.1"); // required engineType Label labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE");// required execute user and creator labels.put(LabelKeyConstant.CODE_TYPE_KEY, "sql"); // required codeType py,sql,scala
You can also submit scala and python code:
//scala labels.put(LabelKeyConstant.CODE_TYPE_KEY, "scala"); code: val df=spark.sql("show tables") show(df) //pyspark /labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); code: df=spark.sql("show tables") show(df)
Through OnceEngineConn
submit tasks (through the spark-submit submit jar package mission), submission for reference org.apache.linkis.com putation.Client.SparkOnceJobTest
.
public class SparkOnceJobTest { public static void main(String[] args) { LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9001"); String submitUser = "linkis"; String engineType = "spark"; SubmittableSimpleOnceJob onceJob = // region LinkisJobClient.once().simple().builder() .setCreateService("Spark-Test") .setMaxSubmitTime(300000) .setDescription("SparkTestDescription") .addExecuteUser(submitUser) .addJobContent("runType", "jar") .addJobContent("spark.app.main.class", "org.apache.spark.examples.JavaWordCount") // Parameters obtained from the submitted jar package .addJobContent("spark.app.args", "hdfs:///tmp/test_word_count.txt") // WordCount test file .addLabel("engineType", engineType + "-2.4.7") .addLabel("userCreator", submitUser + "-IDE") .addLabel("engineConnMode", "once") .addStartupParam("spark.app.name", "spark-submit-jar-test-linkis") // Application Name on yarn .addStartupParam("spark.executor.memory", "1g") .addStartupParam("spark.driver.memory", "1g") .addStartupParam("spark.executor.cores", "1") .addStartupParam("spark.executor.instance", "1") .addStartupParam("spark.app.resource", "hdfs:///tmp/spark/spark-examples_2.11-2.3.0.2.6.5.0-292.jar") .addSource("jobName", "OnceJobTest") .build(); // endregion onceJob.submit(); onceJob.waitForCompleted(); //A temporary network interruption may cause an exception. It is recommended to modify the SDK later. If the SDK is in use at this stage, exception handling is required. // Temporary network failure will cause exceptions. It is recommended to modify the SDK later. For use at this stage, exception handling is required onceJob.waitForCompleted(); } }
Restful API
Scripts type includes sql
、scala
、python
、data_calc(content type is json)
.
POST /api/rest_j/v1/entrance/submit Content-Type: application/json Token-Code: dss-AUTH Token-User: linkis { "executionContent": { // script content, type: sql, python, scala, json "code": "show databases", // script type: sql, py(pyspark), scala, data_calc(json) "runType": "sql" }, "params": { "variable": { }, "configuration": { // spark startup parameters, not required "startup": { "spark.executor.memory": "1g", "spark.driver.memory": "1g", "spark.executor.cores": "1", "spark.executor.instances": 1 } } }, "source": { // not required, file:/// or hdfs:/// "scriptPath": "file:///tmp/hadoop/test.sql" }, "labels": { // pattern:engineType-version "engineType": "spark-3.2.1", // userCreator: linkis is username。IDE is system that be configed in Linkis。 "userCreator": "linkis-IDE" } }
Linkis-cli
Upload the jar package and configuration
# Upload the jar package under the lib of the linkis spark engine (modify the following parameters according to your actual installation directory) cd /appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib hdfs dfs -put *.jar hdfs:///spark/cluster # Upload the linkis configuration file (modify the following parameters according to your actual installation directory) cd /appcom/Install/linkis/conf hdfs dfs -put * hdfs:///spark/cluster # Upload hive-site.xml (modify the following parameters according to your actual installation directory) cd $HIVE_CONF_DIR hdfs dfs -put hive-site.xml hdfs:///spark/cluster
Can pass linkis.spark.yarn.cluster.jars
parameters to modifyhdfs:///spark/cluster
Execute the test case
# Use `engingeConnRuntimeMode=yarnCluster` to specify the yarn cluster mode sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -labelMap engingeConnRuntimeMode=yarnCluster -submitUser hadoop -proxyUser hadoop -code "select 123"
Linkis-cli
Before submitting the task, please install the metric server
on Kubernetes, as relevant APIs will be invoked during the resource validation process.
To submit task to kubernetes cluster, you need to add cluster configuration on Linkis Control Panel->Basic Data Management->External Resource Provider Manage
as show in the figure. The Resource Type
must be set to Kubernetes
while the Name
can be customized.
The parameters to be set in the Config
are shown in the following table:
Conf | Desc |
---|---|
k8sMasterUrl | Full URL of the API Server such ashttps://xxx.xxx.xxx.xxx:6443 . This parameter must be configured. |
k8sConfig | Location of the kubeconfig file such as/home/hadoop/.kube/config . If this parameter is configured, the following three parameters do not need to be configured. |
k8sCaCertData | CA certificate for clusters in kubeconfig corresponding to certificate-authority-data . If k8sConfig is not configured, you need to configure this parameter |
k8sClientCertData | Client certificate in kubeconfig corresponding to client-certificate-data ,If k8sConfig is not configured, you need to configure this parameter |
k8sClientKeyData | Client private key in kubeconfig corresponding to client-key-data ,If k8sConfig is not configured, you need to configure this parameter |
After external provider configuration, you need to configure corresponding cluster label information on ECM Managerment
as shown in the figure. You need to selete yarnCluster
for label type and K8S-cluster name
for label value where the cluster name is the name specified in External Resource Provider Configuration
such as K8S-default
if the name is set to default
in the previous step.
Due to compatibility issues with
ClusterLabel
, the Key value has not been changed yet(yarnCluster).
When usinglinkis-cli
to submit task, the parameters that need to be set are as follows:
default
when configuring the external provider, you need to specify the value of the k8sCluster
as 'K8S-default'
when submitting the task;k8s-operator
submitting method, you need to specify the spark.master
parameter as k8s-native
;cluster
deploy mode, you need to set spark.submit.deployMode
to cluster
.The corresponding Spark parameter of Linkis parameters as follows:
Linkis Parameters | Spark Parameters | Default Value |
---|---|---|
linkis.spark.k8s.master.url | --master | empty string |
linkis.spark.k8s.serviceAccount | spark.kubernetes.authenticate.driver.serviceAccountName | empty string |
linkis.spark.k8s.image | spark.kubernetes.container.image | apache/spark:v3.2.1 |
linkis.spark.k8s.imagePullPolicy | spark.kubernetes.container.image.pullPolicy | Always |
linkis.spark.k8s.namespace | spark.kubernetes.namespace | default |
linkis.spark.k8s.ui.port | spark.ui.port | 4040 |
linkis.spark.k8s.executor.request.cores | spark.kubernetes.executor.request.cores | 1 |
linkis.spark.k8s.driver.request.cores | spark.kubernetes.driver.request.cores | 1 |
submitting task with jar
linkis-cli --mode once \ -engineType spark-3.2.1 \ -labelMap engineConnMode=once \ -k8sCluster 'K8S-default' \ -jobContentMap runType='jar' \ -jobContentMap spark.app.main.class='org.apache.spark.examples.SparkPi' \ -confMap spark.master='k8s-native' \ -confMap spark.app.name='spark-submit-jar-k8s' \ -confMap spark.app.resource='local:///opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar' \ -confMap spark.submit.deployMode='cluster' \ -confMap linkis.spark.k8s.serviceAccount='spark' \ -confMap linkis.spark.k8s.master.url='k8s://https://xxx.xxx.xxx.xxx:6443' \ -confMap linkis.spark.k8s.config.file='/home/hadoop/.kube/config' \ -confMap linkis.spark.k8s.imagePullPolicy='IfNotPresent' \ -confMap linkis.spark.k8s.namespace='default'
submitting task with py
linkis-cli --mode once \ -engineType spark-3.2.1 \ -labelMap engineConnMode=once \ -k8sCluster 'K8S-default' \ -jobContentMap runType='py' \ -confMap spark.master='k8s-native' \ -confMap spark.app.name='spark-submit-py-k8s' \ -confMap spark.app.resource='local:///opt/spark/examples/src/main/python/pi.py' \ -confMap spark.submit.deployMode='cluster' \ -confMap spark.submit.pyFiles='local:///opt/spark/examples/src/main/python/wordcount.py' \ -confMap linkis.spark.k8s.serviceAccount='spark' \ -confMap linkis.spark.k8s.master.url='k8s://https://xxx.xxx.xxx.xxx:6443' \ -confMap linkis.spark.k8s.config.file='/home/hadoop/.kube/config' \ -confMap linkis.spark.k8s.imagePullPolicy='IfNotPresent' \ -confMap linkis.spark.k8s.namespace='default' \ -confMap linkis.spark.k8s.image="apache/spark-py:v3.2.1"
Upgrade instructions for old version
You need to use linkis-dist/package/db/upgrade/1.5.0_schema/mysql/linkis_ddl.sql
to upgrade the database fields. Specifically, the label_key
field of linkis_cg_manager_label
needs to be increased from 32 to 50 in length.
ALTER TABLE `linkis_cg_manager_label` MODIFY COLUMN label_key varchar(50);
Prior to version 1.5.0, when building CombineLabel, ClusterLabel was not included. To maintain compatibility with older versions, when the submitted ClusterLabel value is ‘Yarn-default’, ClusterLabel is still not included when building CombineLabel. You can disable this feature by setting linkis.combined.without.yarn.default
to false (default is true).
The specific reason is that if tasks related to that ClusterLabel were submitted in old versions, corresponding resource records would exist in the database. After upgrading to the new version, since CombineLabel includes ClusterLabel, conflicts would occur in the database's resource records when submitting tasks of this type. Therefore, to maintain compatibility with older versions, the construction of CombineLabel for Yarn-default (the default value of ClusterLabel) still does not include ClusterLabel. If the latest version is installed directly, this issue does not need to be considered because there are no conflicting records in the database. You can set
linkis.combined.without.yarn.default
to false to improve readability.
Validation of submitting tasks
Submitting a Spark Once Job to K8S involves two levels of resource validation, and the task will only be submitted to the K8S cluster after passing both levels of validation:
Configuration | Default | Required | Description |
---|---|---|---|
wds.linkis.rm.instance | 10 | No | Maximum number of concurrent engines |
spark.executor.cores | 1 | No | Number of spark executor cores |
spark.driver.memory | 1g | no | maximum concurrent number of spark executor instances |
spark.executor.memory | 1g | No | spark executor memory size |
wds.linkis.engineconn.max.free.time | 1h | No | Engine idle exit time |
spark.python.version | python2 | no | python version |
Because the execution of spark
requires queue resources, you need to set up a queue that you can execute.
If the default parameters are not satisfied, there are the following ways to configure some basic parameters
Users can customize settings, such as the number of spark
sessions executor
and executor
memory. These parameters are for users to set their own spark
parameters more freely, and other spark
parameters can also be modified, such as the python
version of pyspark
, etc.