Apache Spark Connect for Rust

Clone this repo:

Branches

  1. bbd86af [SPARK-52941] Make GitHub Actions work for spark-connect-rust (#2) by Kousuke Saruta · 4 days ago master
  2. 257df1c Merge pull request #1 from sjrusso8/source by Li Yuanjian · 6 days ago
  3. 379e775 merge repo by sjrusso8 · 7 weeks ago
  4. bb520be feat(license) add apache license to files (#96) by Steve Russo · 7 weeks ago
  5. 84db605 Update Project Description by Yuanjian Li · 9 weeks ago

Apache Spark Connect Client for Rust

This repository hosts the Rust client for Apache Spark Connect.

🚧 This project is under active development and not yet production-ready.

Current State of the Project

Currently, the Spark Connect client for Rust is highly experimental and should not be used in any production setting. This is currently a “proof of concept” to identify the methods of interacting with Spark cluster from rust.

The spark-connect-rs aims to provide an entrypoint to Spark Connect, and provide similar DataFrame API interactions.

Project Layout

├── crates          <- crates for the implementation of the client side spark-connect bindings
│   └─ connect      <- crate for 'spark-connect-rs'
│      └─ protobuf  <- connect protobuf for apache/spark
├── examples        <- examples of using different aspects of the crate
├── datasets        <- sample files from the main spark repo

Future state would be to have additional crates that allow for easier creation of other language bindings.

Getting Started

This section explains how run Spark Connect Rust locally starting from 0.

Step 1: Install rust via rustup: https://www.rust-lang.org/tools/install

Step 2: Ensure you have a cmake and protobuf installed on your machine

Step 3: Run the following commands to clone the repo

git clone https://github.com/sjrusso8/spark-connect-rs.git

cargo build

Step 4: Setup the Spark Driver on localhost either by downloading spark or with docker.

With local spark:

  1. Download Spark distribution (3.5.1 recommended), unzip the package.

  2. Set your SPARK_HOME environment variable to the location where spark was extracted to,

  3. Start the Spark Connect server with the following command (make sure to use a package version that matches your Spark distribution):

$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \
      --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

With docker:

  1. Start the Spark Connect server by leveraging the created docker-compose.yml in this repo. This will start a Spark Connect Server running on port 15002
docker compose up --build -d

Step 5: Run an example from the repo under /examples

Features

The following section outlines some of the larger functionality that are not yet working with this Spark Connect implementation.

  • done TLS authentication & Databricks compatability via the feature flag feature = 'tls'
  • open UDFs or any type of functionality that takes a closure (foreach, foreachBatch, etc.)

SparkSession

Spark Session type object and its implemented traits

SparkSessionAPIComment
activeopen
addArtifact(s)open
addTagdone
clearTagsdone
copyFromLocalToFsopen
createDataFramepartialPartial. Only works for RecordBatch
getActiveSessionsopen
getTagsdone
interruptAlldone
interruptOperationdone
interruptTagdone
newSessionopen
rangedone
removeTagdone
sqldone
stopopen
tabledone
catalogdoneCatalog
clientdoneunstable developer api for testing only
confdoneConf
readdoneDataFrameReader
readStreamdoneDataStreamReader
streamsdoneStreams
udfopenUdf - may not be possible
udtfopenUdtf - may not be possible
versiondone

SparkSessionBuilder

SparkSessionBuilderAPIComment
appNamedone
configdone
masteropen
remotepartialValidate using spark connection string

RuntimeConfig

RuntimeConfigAPIComment
getdone
isModifiabledone
setdone
unsetdone

Catalog

CatalogAPIComment
cacheTabledone
clearCachedone
createExternalTaledone
createTabledone
currentCatalogdone
currentDatabasedone
databaseExistsdone
dropGlobalTempViewdone
dropTempViewdone
functionExistsdone
getDatabasedone
getFunctiondone
getTabledone
isCacheddone
listCatalogsdone
listDatabasesdone
listFunctionsdone
listTablesdone
recoverPartitionsdone
refreshByPathdone
refreshTabledone
registerFunctionopen
setCurrentCatalogdone
setCurrentDatabasedone
tableExistsdone
uncacheTabledone

DataFrameReader

DataFrameReaderAPIComment
csvdone
formatdone
jsondone
loaddone
optiondone
optionsdone
orcdone
parquetdone
schemadone
tabledone
textdone

DataFrameWriter

Spark Connect should respect the format as long as your cluster supports the specified type and has the required jars

DataFrameWriterAPIComment
bucketBydone
csvdone
formatdone
insertIntodone
jdbcopen
jsondone
modedone
optiondone
optionsdone
orcdone
parquetdone
partitionBydone
savedone
saveAsTabledone
sortBydone
textdone

DataFrameWriterV2

DataFrameWriterV2APIComment
appenddone
createdone
createOrReplacedone
optiondone
optionsdone
overwritedone
overwritePartitionsdone
partitionedBydone
replacedone
tablePropertydone
usingdone

DataStreamReader

DataStreamReaderAPIComment
csvopen
formatdone
jsonopen
loaddone
optiondone
optionsdone
orcopen
parquetopen
schemadone
tableopen
textopen

DataStreamWriter

Start a streaming job and return a StreamingQuery object to handle the stream operations.

DataStreamWriterAPIComment
foreach
foreachBatch
formatdone
optiondone
optionsdone
outputModedoneUses an Enum for OutputMode
partitionBydone
queryNamedone
startdone
toTabledone
triggerdoneUses an Enum for TriggerMode

StreamingQuery

StreamingQueryAPIComment
awaitTerminationdone
exceptiondone
explaindone
processAllAvailabledone
stopdone
iddone
isActivedone
lastProgressdone
namedone
recentProgressdone
runIddone
statusdone

StreamingQueryManager

StreamingQueryManagerAPIComment
awaitAnyTerminationdone
getdone
resetTerminateddone
activedone

StreamingQueryListener

StreamingQueryListenerAPIComment
onQueryIdleopen
onQueryProgressopen
onQueryStartedopen
onQueryTerminatedopen

DataFrame

Spark DataFrame type object and its implemented traits.

DataFrameAPIComment
aggdone
aliasdone
approxQuantiledone
cachedone
checkpointopenNot part of Spark Connect
coalescedone
colRegexdone
collectdone
columnsdone
corrdone
countdone
covdone
createGlobalTempViewdone
createOrReplaceGlobalTempViewdone
createOrReplaceTempViewdone
createTempViewdone
crossJoindone
crosstabdone
cubedone
describedone
distinctdone
dropdone
dropDuplicatesWithinWatermarkdone
drop_duplicatesdone
dropnadone
dtypesdone
exceptAlldone
explaindone
fillnadone
filterdone
firstdone
foreachopen
foreachPartitionopen
freqItemsdone
groupBydone
headdone
hintdone
inputFilesdone
intersectdone
intersectAlldone
isEmptydone
isLocaldone
isStreamingdone
joindone
limitdone
localCheckpointopenNot part of Spark Connect
mapInPandasopenTBD on this exact implementation
mapInArrowopenTBD on this exact implementation
meltdone
nadone
observeopen
offsetdone
orderBydone
persistdone
printSchemadone
randomSplitdone
registerTempTabledone
repartitiondone
repartitionByRangedone
replacedone
rollupdone
sameSemanticsdone
sampledone
sampleBydone
schemadone
selectdone
selectExprdone
semanticHashdone
showdone
sortdone
sortWithinPartitionsdone
sparkSessiondone
statdone
storageLeveldone
subtractdone
summarydone
taildone
takedone
todone
toDFdone
toJSONpartialDoes not return an RDD but a long JSON formatted String
toLocalIteratoropen
toPandas to_polars & toPolarspartialConvert to a polars::frame::DataFrame
new to_datafusion & toDataFusiondoneConvert to a datafusion::dataframe::DataFrame
transformdone
uniondone
unionAlldone
unionByNamedone
unpersistdone
unpivotdone
wheredoneuse filter instead, where is a keyword for rust
withColumndone
withColumnsdone
withColumnRenameddone
withColumnsRenameddone
withMetadatadone
withWatermarkdone
writedone
writeStreamdone
writeTodone

Column

Spark Column type object and its implemented traits

ColumnAPIComment
aliasdone
ascdone
asc_nulls_firstdone
asc_nulls_lastdone
astypeopen
betweenopen
castdone
containsdone
descdone
desc_nulls_firstdone
desc_nulls_lastdone
dropFieldsdone
endswithdone
eqNullSafeopen
getFieldopenThis is depreciated but will need to be implemented
getItemopenThis is depreciated but will need to be implemented
ilikedone
isNotNulldone
isNulldone
isindone
likedone
namedone
otherwiseopen
overdoneRefer to Window for creating window specifications
rlikedone
startswithdone
substrdone
whenopen
withFielddone
eq ==doneRust does not like when you try to overload == and return something other than a bool. Currently implemented column equality like col('name').eq(col('id')). Not the best, but it works for now
addition +done
subtration -done
multiplication *done
division /done
OR |done
AND &done
XOR ^done
Negate ~done

Data Types

Data types are used for creating schemas and for casting columns to specific types

ColumnAPIComment
ArrayTypedone
BinaryTypedone
BooleanTypedone
ByteTypedone
DateTypedone
DecimalTypedone
DoubleTypedone
FloatTypedone
IntegerTypedone
LongTypedone
MapTypedone
NullTypedone
ShortTypedone
StringTypedone
CharTypedone
VarcharTypedone
StructFielddone
StructTypedone
TimestampTypedone
TimestampNTZTypedone
DayTimeIntervalTypedone
YearMonthIntervalTypedone

Literal Types

Create Spark literal types from these rust types. E.g. lit(1_i64) would be a LongType() in the schema.

An array can be made like lit([1_i16,2_i16,3_i16]) would result in an ArrayType(Short) since all the values of the slice can be translated into literal type.

Spark Literal TypeRust TypeStatus
Nullopen
Binary&[u8]done
Booleanbooldone
Byteopen
Shorti16done
Integeri32done
Longi64done
Floatf32done
Doublef64done
Decimalopen
String&str / Stringdone
Datechrono::NaiveDatedone
Timestampchrono::DateTime<Tz>done
TimestampNtzchrono::NaiveDateTimedone
CalendarIntervalopen
YearMonthIntervalopen
DayTimeIntervalopen
Arrayslice / Vecdone
MapCreate with the function create_mapdone
StructCreate with the function struct_col or named_structdone

Window & WindowSpec

For ease of use it's recommended to use Window to create the WindowSpec.

WindowAPIComment
currentRowdone
orderBydone
partitionBydone
rangeBetweendone
rowsBetweendone
unboundedFollowingdone
unboundedPrecedingdone
WindowSpec.orderBydone
WindowSpec.partitionBydone
WindowSpec.rangeBetweendone
WindowSpec.rowsBetweendone

Functions

Only a few of the functions are covered by unit tests. Functions involving closures or lambdas are not feasible.

FunctionsAPIComments
absdone
acosdone
acoshdone
add_monthsdone
aes_decryptdone
aes_encryptdone
aggregateopen
any_valuedone
approx_count_distinctdone
approx_percentileopen
arraydone
array_aggdone
array_appenddone
array_compactdone
array_containsdone
array_distinctdone
array_exceptdone
array_insertdone
array_intersectdone
array_joindone
array_maxdone
array_mindone
array_positiondone
array_prependdone
array_removedone
array_repeatdone
array_sizedone
array_sortopen
array_uniondone
arrays_overlapdone
arrays_zipdone
ascdone
asc_nulls_firstdone
asc_nulls_lastdone
asciidone
asindone
asinhdone
assert_truedone
atandone
atan2done
atanhdone
avgdone
base64done
bindone
bit_anddone
bit_countdone
bit_getdone
bit_lengthdone
bit_ordone
bit_xordone
bitmap_bit_positiondone
bitmap_bucket_numberdone
bitmap_construct_aggdone
bitmap_countdone
bitmap_or_aggdone
bitwise_notdone
bool_anddone
bool_ordone
broadcastdone
brounddone
btrimdone
bucketdone
call_functionopen
call_udfopen
cardinalitydone
cbrtdone
ceildone
ceilingdone
chardone
char_lengthdone
character_lengthdone
coalescedone
coldone
collect_listdone
collect_setdone
columndone
concatdone
concat_wsdone
containsdone
convdone
convert_timezonedone
corrdone
cosdone
coshdone
cotdone
countdone
count_distinctdone
count_ifdone
count_min_sketchdone
covar_popdone
covar_sampdone
crc32done
create_mapdone
cscdone
cume_distdone
curdatedone
current_catalogdone
current_databasedone
current_datedone
current_schemadone
current_timestampdone
current_timezonedone
current_userdone
date_adddone
date_diffdone
date_formatdone
date_from_unix_datedone
date_partdone
date_subdone
date_truncdone
dateadddone
datediffdone
datepartopen
daydone
dayofmonthdone
dayofweekdone
dayofyeardone
daysdone
decodedone
degreesdone
dense_rankdone
descdone
desc_nulls_firstdone
desc_nulls_lastdone
edone
element_atdone
eltdone
encodedone
endswithdone
equal_nulldone
everydone
existsopen
expdone
explodedone
explode_outerdone
expm1done
exprdone
extractdone
factorialdone
filteropen
find_in_setdone
firstdone
first_valuedone
flattendone
floordone
forallopen
format_numberdone
format_stringdone
from_csvdone
from_jsondone
from_unixtimedone
from_utc_timestampdone
getdone
get_json_objectdone
getbitdone
greatestdone
groupingdone
grouping_iddone
hashdone
hexdone
histogram_numericdone
hll_sketch_aggdone
hll_sketch_estimatedone
hll_uniondone
hll_union_aggdone
hourdone
hoursdone
hypotdone
ifnulldone
ilikedone
initcapdone
inlinedone
inline_outerdone
input_file_block_lengthdone
input_file_block_startdone
input_file_namedone
instrdone
isnandone
isnotnulldone
isnulldone
java_methoddone
json_array_lengthdone
json_object_keysdone
json_tupledone
kurtosisdone
lagdone
lastdone
last_daydone
last_valuedone
lcasedone
leaddone
leastdone
leftdone
lengthdone
levenshteindone
likedone
litdone
lndone
localtimestampdone
locatedone
logdone
log10done
log1pdone
log2done
lowerdone
lpaddone
ltrimdone
make_datedone
make_dt_intervaldone
make_intervaldone
make_timestampdone
make_timestamp_ltzdone
make_timestamp_ntzdone
make_ym_intervaldone
map_concatdone
map_contains_keydone
map_entriesdone
map_filteropen
map_from_arraysdone
map_from_entriesdone
map_keysdone
map_valuesdone
map_zip_withopen
maskopen
maxdone
max_bydone
md5done
meandone
mediandone
mindone
min_bydone
minutedone
modedone
monotonically_increasing_iddone
monthdone
monthsdone
months_betweendone
named_structdone
nanvldone
negatedone
negativedone
next_daydone
nowdone
nth_valuedone
ntiledone
nullifdone
nvldone
nvl2done
octet_lengthdone
overlaydone
pandas_udfopen
parse_urldone
percent_rankdone
percentiledone
percentile_approxdone
pidone
pmoddone
posexplodedone
posexplode_outerdone
positiondone
positivedone
powdone
powerdone
printfdone
productdone
quarterdone
radiansdone
raise_errordone
randdone
randndone
rankdone
reduceopen
reflectdone
regexpdone
regexp_countdone
regexp_extractdone
regexp_extract_alldone
regexp_instrdone
regexp_likedone
regexp_replacedone
regexp_substrdone
regr_avgxdone
regr_avgydone
regr_countdone
regr_interceptdone
regr_r2done
regr_slopedone
regr_sxxdone
regr_sxydone
regr_syydone
repeatdone
replacedone
reversedone
rightdone
rintdone
rlikedone
rounddone
row_numberdone
rpaddone
rtrimdone
schema_of_csvdone
schema_of_jsondone
secdone
seconddone
sentencesdone
sequencedone
session_windowdone
shadone
sha1done
sha2done
shiftleftdone
shiftrightdone
shiftrightunsigneddone
shuffledone
signdone
signumdone
sindone
sinhdone
sizedone
skewnessdone
slicedone
somedone
sort_arraydone
soundexdone
spark_partition_iddone
splitdone
split_partdone
sqrtdone
stackdone
startswithdone
stddone
stddevdone
stddev_popdone
stddev_sampdone
str_to_mapdone
structopen
substrdone
substringdone
substring_indexdone
sumdone
sum_distinctdone
tandone
tanhdone
timestamp_microsdone
timestamp_millisdone
timestamp_secondsdone
to_binarydone
to_chardone
to_csvdone
to_datedone
to_jsondone
to_numberdone
to_timestampdone
to_timestamp_ltzdone
to_timestamp_ntzdone
to_unix_timestampdone
to_utc_timestampdone
to_varchardone
to_degreesdone
to_radiansdone
transformopen
transform_keysopen
transform_valuesopen
translatedone
trimdone
truncdone
try_adddone
try_aes_decryptdone
try_avgdone
try_dividedone
try_element_atdone
try_multiplydone
try_subtractdone
try_sumdone
try_to_binarydone
try_to_numberdone
try_to_timestampdone
typeofopen
ucasedone
udfopen
udtfopen
unbase64done
unhexdone
unix_datedone
unix_microsopen
unix_millisdone
unix_secondsdone
unix_timestampdone
unwrap_udtopen
upperdone
url_decodedone
url_encodedone
userdone
var_popdone
var_sampdone
variancedone
versiondone
weekdaydone
weekofyeardone
whenopen
width_bucketdone
windowdone
window_timedone
xpathdone
xpath_booleandone
xpath_doubledone
xpath_floatdone
xpath_intdone
xpath_longdone
xpath_numberdone
xpath_shortdone
xpath_stringdone
xxhash64done
yeardone
yearsdone
zip_withopen

UdfRegistration (may not be possible)

UDFRegistrationAPIComment
registeropen
registerJavaFunctionopen
registerJavaUDAFopen

UdtfRegistration (may not be possible)

UDTFRegistrationAPIComment
registeropen