layout: global title: Application Development with Spark Connect license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Spark Connect Overview
In Apache Spark 3.4, Spark Connect introduced a decoupled client-server architecture that allows remote connectivity to Spark clusters using the DataFrame API and unresolved logical plans as the protocol. The separation between client and server allows Spark and its open ecosystem to be leveraged from everywhere. It can be embedded in modern data applications, in IDEs, Notebooks and programming languages.
To learn more about Spark Connect, see Spark Connect Overview.
With its decoupled client-server architecture, Spark Connect simplifies how Spark Applications are developed. The notion of Spark Client Applications and Spark Server Libraries are introduced as follows:
With Spark 3.4 and Spark Connect, the development of Spark Client Applications is simplified, and clear extension points and guidelines are provided on how to build Spark Server Libraries, making it easy for both types of applications to evolve alongside Spark. As illustrated in Fig.1, Spark Client applications connect to Spark using the Spark Connect API, which is essentially the DataFrame API and fully declarative.
Spark provides the API mode, spark.api.mode
configuration, enabling Spark Classic applications to seamlessly switch to Spark Connect. Depending on the value of spark.api.mode
, the application can run in either Spark Classic or Spark Connect mode. Here is an example:
{% highlight python %} from pyspark.sql import SparkSession
SparkSession.builder.config(“spark.api.mode”, “connect”).master(“...”).getOrCreate() {% endhighlight %}
You can also apply this configuration to both Scala and PySpark applications when submitting yours:
{% highlight bash %} spark-submit --master “...” --conf spark.api.mode=connect {% endhighlight %}
Additionally, Spark Connect offers convenient options for local testing. By setting spark.remote
to local[...]
or local-cluster[...]
, you can start a local Spark Connect server and access a Spark Connect session.
This is similar to using --conf spark.api.mode=connect
with --master ...
. However, note that spark.remote
and --remote
are limited to local*
values, while --conf spark.api.mode=connect
with --master ...
supports additional cluster URLs, such as spark://, for broader compatibility with Spark Classic.
Spark Client Applications are the regular Spark applications that Spark users develop today, e.g., ETL pipelines, data preparation, or model training or inference. These are typically built using Sparks declarative DataFrame and DataSet APIs. With Spark Connect, the core behaviour remains the same, but there are a few differences:
Client applications based on Spark Connect can be submitted in the same way as any previous job. In addition, Spark Client Applications based on Spark Connect have several benefits compared to classic Spark applications using earlier Spark versions (3.4 and below):
Until Spark 3.4, extensions to Spark (e.g., Spark ML or Spark-NLP) were built and deployed like Spark Client Applications. With Spark 3.4 and Spark Connect, explicit extension points are offered to extend Spark via Spark Server Libraries. These extension points provide functionality that can be exposed to a client, which differs from existing extension points in Spark such as SparkSession extensions or Spark Plugins.
Spark Connect is available and supports PySpark and Scala applications. We will walk through how to run an Apache Spark server with Spark Connect and connect to it from a client application using the Spark Connect client library.
A Spark Server Library consists of the following components, illustrated in Fig. 2:
To extend Spark with a new Spark Server Library, developers can extend the three main operation types in the Spark Connect protocol: Relation, Expression, and Command.
{% highlight protobuf %} message Relation { oneof rel_type { Read read = 1; // ... google.protobuf.Any extension = 998; } }
message Expression { oneof expr_type { Literal literal = 1; // ... google.protobuf.Any extension = 999; } }
message Command { oneof command_type { WriteCommand write_command = 1; // ... google.protobuf.Any extension = 999; } } {% endhighlight %} Their extension fields allow serializing arbitrary protobuf messages as part of the Spark Connect protocol. These messages represent the parameters or state of the extension implementation. To build a custom expression type, the developer first defines the custom protobuf definition of the expression.
{% highlight protobuf %} message ExamplePluginExpression { Expression child = 1; string custom_field = 2; } {% endhighlight %}
As a next step, the developer implements the ExpressionPlugin class of Spark Connect with custom application logic based on the input parameters of the protobuf message. {% highlight protobuf %} class ExampleExpressionPlugin extends ExpressionPlugin { override def transform( relation: protobuf.Any, planner: SparkConnectPlanner): Option[Expression] = { // Check if the serialized value of protobuf.Any matches the type // of our example expression. if (!relation.is(classOf[proto.ExamplePluginExpression])) { return None } val exp = relation.unpack(classOf[proto.ExamplePluginExpression]) Some(Alias(planner.transformExpression( exp.getChild), exp.getCustomField)(explicitMetadata = None)) } } {% endhighlight %}
Once the application logic is developed, the code must be packaged as a jar and Spark must be configured to pick up the additional logic. The relevant Spark configuration options are:
Once the server component is deployed, any client can use it with the right protobuf messages. In the example above, the following message payload sent to the Spark Connect endpoint would be enough to trigger the extension mechanism. {% highlight json %} { “project”: { “input”: { “sql”: { “query”: “select * from samples.nyctaxi.trips” } }, “expressions”: [ { “extension”: { “typeUrl”: “type.googleapis.com/spark.connect.ExamplePluginExpression”, “value”: “\n\006\022\004\n\002id\022\006testval” } } ] } } {% endhighlight %} To make the example available in Python, the application developer provides a Python library that wraps the new expression and embeds it into PySpark. The easiest way to provide a function for any expression is to take a PySpark column instance as an argument and return a new Column instance with the expression applied.
{% highlight python %} from pyspark.sql.connect.column import Expression import pyspark.sql.connect.proto as proto
from myxample.proto import ExamplePluginExpression
class ExampleExpression(Expression): def to_plan(self, session) -> proto.Expression: fun = proto.Expression() plugin = ExamplePluginExpression() plugin.child.literal.long = 10 plugin.custom_field = “example” fun.extension.Pack(plugin) return fun
def example_expression(col: Column) -> Column: return Column(ExampleExpression())
df = spark.read.table(“samples.nyctaxi.trips”) df.select(example_expression(df[“fare_amount”])).collect() {% endhighlight %}