Javier Luraschi is a software engineer at RStudio
Support for Apache Arrow in Apache Spark with R is currently under active development in the sparklyr and SparkR projects. This post explores early, yet promising, performance improvements achieved when using R with Apache Spark, Arrow and sparklyr
.
Since this work is under active development, install sparklyr
and arrow
from GitHub as follows:
devtools::install_github("apache/arrow", subdir = "r", ref = "apache-arrow-0.12.0") devtools::install_github("rstudio/sparklyr", ref = "apache-arrow-0.12.0")
In this benchmark, we will use dplyr, but similar improvements can be expected from using DBI, or Spark DataFrames in sparklyr
. The local Spark connection and dataframe with 10M numeric rows was initialized as follows:
library(sparklyr) library(dplyr) sc <- spark_connect(master = "local", config = list("sparklyr.shell.driver-memory" = "6g")) data <- data.frame(y = runif(10^7, 0, 1))
Currently, copying data to Spark using sparklyr
is performed by persisting data on-disk from R and reading it back from Spark. This was meant to be used for small datasets since there are better tools to transfer data into distributed storage systems. Nevertheless, many users have requested support to transfer more data at fast speeds into Spark.
Using arrow
with sparklyr
, we can transfer data directly from R to Spark without having to serialize this data in R or persist in disk.
The following example copies 10M rows from R into Spark using sparklyr
with and without arrow
, there is close to a 16x improvement using arrow
.
This benchmark uses the microbenchmark R package, which runs code multiple times, provides stats on total execution time and plots each excecution time to understand the distribution over each iteration.
microbenchmark::microbenchmark( setup = library(arrow), arrow_on = { sparklyr_df <<- copy_to(sc, data, overwrite = T) count(sparklyr_df) %>% collect() }, arrow_off = { if ("arrow" %in% .packages()) detach("package:arrow") sparklyr_df <<- copy_to(sc, data, overwrite = T) count(sparklyr_df) %>% collect() }, times = 10 ) %T>% print() %>% ggplot2::autoplot()
Unit: seconds expr min lq mean median uq max neval arrow_on 3.011515 4.250025 7.257739 7.273011 8.974331 14.23325 10 arrow_off 50.051947 68.523081 119.946947 71.898908 138.743419 390.44028 10
Similarly, arrow
with sparklyr
can now avoid deserializing data in R while collecting data from Spark into R. These improvements are not as significant as copying data since, sparklyr
already collects data in columnar format.
The following benchmark collects 10M rows from Spark into R and shows that arrow
can bring 3x improvements.
microbenchmark::microbenchmark( setup = library(arrow), arrow_on = { collect(sparklyr_df) }, arrow_off = { if ("arrow" %in% .packages()) detach("package:arrow") collect(sparklyr_df) }, times = 10 ) %T>% print() %>% ggplot2::autoplot()
Unit: seconds expr min lq mean median uq max neval arrow_on 4.520593 5.609812 6.154509 5.928099 6.217447 9.432221 10 arrow_off 7.882841 13.358113 16.670708 16.127704 21.051382 24.373331 10
Today, custom transformations of data using R functions are performed in sparklyr
by moving data in row-format from Spark into an R process through a socket connection, transferring data in row-format is inefficient since multiple data types need to be deserialized over each row, then the data gets converted to columnar format (R was originally designed to use columnar data), once R finishes this computation, data is again converted to row-format, serialized row-by-row and then sent back to Spark over the socket connection.
By adding support for arrow
in sparklyr
, it makes Spark perform the row-format to column-format conversion in parallel in Spark. Data is then transferred through the socket but no custom serialization takes place. All the R process needs to do is copy this data from the socket into its heap, transform it and copy it back to the socket connection.
The following example transforms 100K rows with and without arrow
enabled, arrow
makes transformation with R functions close to 41x faster.
microbenchmark::microbenchmark( setup = library(arrow), arrow_on = { sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count() }, arrow_off = { if ("arrow" %in% .packages()) detach("package:arrow") sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count() }, times = 10 ) %T>% print() %>% ggplot2::autoplot()
Unit: seconds expr min lq mean median uq max neval arrow_on 3.881293 4.038376 5.136604 4.772739 5.759082 7.873711 10 arrow_off 178.605733 183.654887 213.296238 227.182018 233.601885 238.877341 10
Additional benchmarks and fine-tuning parameters can be found under sparklyr
/rstudio/sparklyr/pull/1611 and SparkR
/apache/spark/pull/22954. Looking forward to bringing this feature to the Spark, Arrow and R communities.