| --- |
| title: "Introduction to Apache Sedona for R" |
| output: rmarkdown::html_vignette |
| vignette: > |
| %\VignetteIndexEntry{Introduction to Apache Sedona for R} |
| %\VignetteEngine{knitr::rmarkdown} |
| %\VignetteEncoding{UTF-8} |
| --- |
| |
| ```{r, include = FALSE} |
| knitr::opts_chunk$set( |
| collapse = TRUE, |
| eval = FALSE, |
| comment = "#>" |
| ) |
| ``` |
| |
| # Introduction |
| |
| `{apache.sedona}` is a `{sparklyr}`-based R interface for |
| [Apache Sedona](https://sedona.apache.org). It presents what Apache |
| Sedona has to offer through idiomatic frameworks and constructs in R |
| (e.g., one can build spatial Spark SQL queries using Sedona UDFs in |
| conjunction with a wide range of dplyr expressions), hence making Apache |
| Sedona highly friendly for R users. |
| |
| Generally speaking, when working with Apache Sedona, one choose between |
| the following two modes: |
| |
| - Manipulating Sedona [Spatial Resilient Distributed |
| Datasets](../../../tutorial/rdd) |
| with spatial-RDD-related routines |
| - Querying geometric columns within [Spatial DataFrames](../../../tutorial/sql) with Sedona |
| spatial UDFs |
| |
| While the former option enables more fine-grained control over low-level |
| implementation details (e.g., which index to build for spatial queries, |
| which data structure to use for spatial partitioning, etc), the latter |
| is simpler and leads to a straightforward integration with `{dplyr}`, |
| `{sparklyr}`, and other `sparklyr` extensions (e.g., one can build ML |
| feature extractors with Sedona UDFs and connect them with ML pipelines |
| using `ml_*()` family of functions in `sparklyr`, hence creating ML |
| workflows capable of understanding spatial data). |
| |
| Because data from spatial RDDs can be imported into Spark DataFrames as |
| geometry columns and vice versa, one can switch between the |
| abovementioned two modes fairly easily. |
| |
| At the moment `apache.sedona` consists of the following components: |
| |
| - R interface for Spatial-RDD-related functionalities |
| - Reading/writing spatial data in WKT, WKB, and GeoJSON formats |
| - Shapefile reader |
| - Spatial partition, index, join, KNN query, and range query |
| operations |
| - Visualization routines |
| - `dplyr`-integration for Sedona spatial UDTs and UDFs |
| - See [SQL APIs](../../../api/sql/Overview/) for the list |
| of available UDFs |
| - Functions importing data from spatial RDDs to Spark DataFrames and |
| vice versa |
| |
| # Connect to Spark |
| |
| To ensure Sedona serialization routines, UDTs, and UDFs are properly |
| registered when creating a Spark session, one simply needs to attach |
| `apache.sedona` before instantiating a Spark connection. apache.sedona |
| will take care of the rest. For example, |
| |
| ```{r eval=FALSE, echo=TRUE} |
| library(sparklyr) |
| library(apache.sedona) |
| |
| spark_home <- "/usr/lib/spark" # NOTE: replace this with your $SPARK_HOME directory |
| sc <- spark_connect(master = "yarn", spark_home = spark_home) |
| ``` |
| |
| will create a Sedona-capable Spark connection in YARN client mode, and |
| ```{r include=FALSE} |
| Sys.setenv("SEDONA_JAR_FILES" = "~/WORK/MISC_CODE/sedona/spark-shaded/target/sedona-spark-shaded-3.0_2.12-1.4.0-SNAPSHOT.jar") |
| ``` |
| |
| ```{r message=FALSE, warning=FALSE} |
| library(sparklyr) |
| library(apache.sedona) |
| |
| sc <- spark_connect(master = "local") |
| ``` |
| |
| will create a Sedona-capable Spark connection to an Apache Spark |
| instance running locally. |
| |
| In `sparklyr`, one can easily inspect the Spark connection object to |
| sanity-check it has been properly initialized with all Sedona-related |
| dependencies, e.g., |
| |
| ```{r} |
| print(sc$extensions$packages) |
| ``` |
| |
| and |
| |
| ```{r} |
| spark_session(sc) %>% |
| invoke("%>%", list("conf"), list("get", "spark.kryo.registrator")) %>% |
| print() |
| ``` |
| |
| |
| For more information about connecting to Spark with `sparklyr`, see |
| <https://therinspark.com/connections.html> and `?sparklyr::spark_connect`. Also see |
| [Initiate Spark Context](../../../tutorial/rdd/#initiate-sparkcontext) and [Initiate Spark Session](../../../tutorial/sql/#initiate-sparksession) for minimum and recommended dependencies for Apache Sedona. |
| |
| |
| |
| # dplyr workflows |
| |
| `apache.sedona` extends `sparklyr` integrates with dplyr workflows. See the [sparklyr cheatsheet](https://spark.rstudio.com/images/homepage/sparklyr.pdf) |
| |
| ```{r message=FALSE, warning=FALSE} |
| library(dplyr) |
| ``` |
| |
| ## Loading data |
| |
| ### Copying from R |
| |
| Data loaded in R can be copied to Spark using `copy_to`. Columns containing spatial information can be converted to the geometry type in Spark DataFrames (GeometryUDT) with Spark SQL functions such as `ST_GeomFromText` or `ST_GeomFromText`, see [Vector constructors](../../../api/sql/Constructor/) and [Raster input and output](../../../api/sql/Raster-loader/). |
| |
| ```{r} |
| data <- readr::read_csv(here::here("../spark/common/src/test/resources/arealm.csv"), col_names = FALSE, show_col_types = FALSE) |
| data %>% glimpse() |
| |
| data_tbl <- copy_to(sc, data) |
| |
| data_tbl |
| |
| data_tbl %>% |
| transmute(geometry = st_geomfromtext(X1)) %>% |
| sdf_schema() |
| ``` |
| No automatic translation of `{sf}` objects is provided, they need to be converted to text (or binary) format before copying to spark. |
| |
| ```{r} |
| data <- sf::st_read(here::here("../spark/common/src/test/resources/testPolygon.json")) |
| |
| data %>% glimpse() |
| |
| data_tbl <- |
| copy_to( |
| sc, |
| data %>% |
| mutate(geometry_wkb = geometry %>% sf::st_as_text()) %>% |
| sf::st_drop_geometry(), |
| name = "data", |
| overwrite = TRUE |
| ) |
| |
| data_tbl %>% |
| transmute(geometry = st_geomfromtext(geometry_wkb)) %>% |
| sdf_schema() |
| ``` |
| |
| ### Loading directly in Spark |
| |
| Loading data in R and then copying it to Spark will most likely not be the optimal method to prepare data for analysis; loading data directly into Spark will often be best.`spark_read_*` functions are made for this purpose (and extend `spark_read_*` functions in `sparklyr`). |
| |
| ```{r} |
| data_tbl <- spark_read_geojson(sc, path = here::here("../spark/common/src/test/resources/testPolygon.json"), name = "data") |
| |
| data_tbl %>% |
| glimpse() |
| |
| data_tbl %>% |
| # select(geometry) %>% |
| sdf_schema() %>% |
| lobstr::tree() |
| ``` |
| |
| |
| ## Manipulating |
| |
| The dbplyr interface transparently translates dbplyr workflows into SQL, and gives access to all Apache Sedona SQL functions: |
| |
| * [Vector functions](../../../api/sql/Function/) |
| * [Vector predicates](../../../api/sql/Predicate/) |
| * [Vector aggregate functions](../../../api/sql/AggregateFunction/) |
| * [Raster operators](../../../api/sql/Raster-operators/) |
| |
| Results are then collected back into R with `collect`. |
| |
| ```{r} |
| ## ! ST_transform uses lon/lat order since v1.5.0. Before, it used lat/lon order. |
| data_tbl %>% |
| mutate( |
| ALAND = ALAND %>% as.numeric(), |
| AWATER = AWATER %>% as.numeric(), |
| area = ALAND + AWATER, |
| geometry_proj = st_transform(geometry, "epsg:4326", "epsg:5070", TRUE), |
| area_geom = st_area(geometry_proj) |
| ) %>% |
| select(STATEFP, COUNTYFP, area, area_geom) %>% |
| head() %>% |
| collect() |
| ``` |
| |
| Geometries need to be converted to a serializable (text or binary) format before `collect` is called: |
| ```{r} |
| ## Setting the CRS in R post-collect |
| data_tbl %>% |
| mutate( |
| area = st_area(st_transform(geometry, "epsg:4326", "epsg:5070", TRUE)), |
| geometry_wkb = geometry %>% st_asBinary() |
| ) %>% |
| select(COUNTYFP, geometry_wkb) %>% |
| head() %>% |
| collect() %>% |
| sf::st_as_sf(crs = 4326) |
| ``` |
| |
| |
| ```{r} |
| ## Setting the CRS in Spark (and using EWKT to keep it) |
| data_tbl %>% |
| mutate( |
| area = st_area(st_transform(geometry, "epsg:4326", "epsg:5070", TRUE)), |
| geometry_ewkt = geometry %>% st_setsrid(4326) %>% st_asewkt() |
| ) %>% |
| select(COUNTYFP, geometry_ewkt) %>% |
| head() %>% |
| collect() %>% |
| sf::st_as_sf(wkt = "geometry_ewkt") |
| ``` |
| |
| |
| ## Writing |
| Collected results can be saved from R. In many cases it will be more efficient to write results directly from Spark. The `spark_write_*` (see [docs](?apache.sedona::spark_write_geojson)) functions are made for this purpose (and extend `spark_write_*` functions in `sparklyr`). |
| |
| ```{r} |
| dest_file <- tempfile() ## Destination folder |
| data_tbl %>% |
| filter(str_sub(COUNTYFP, 1, 2) == "00") %>% |
| spark_write_geoparquet(path = dest_file) |
| |
| dest_file %>% dir(recursive = TRUE) |
| ``` |
| |
| The output can be partitioned by the columns present in the data: |
| |
| ```{r} |
| dest_file <- tempfile() ## Destination folder |
| data_tbl %>% |
| filter(str_sub(COUNTYFP, 1, 2) == "00") %>% |
| spark_write_geoparquet(path = dest_file, partition_by = "COUNTYFP") |
| |
| dest_file %>% dir(recursive = TRUE) |
| ``` |
| |
| ## Spark DataFrames |
| |
| [Spark DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html) provide a higher level API than RDDs, and can be used with SQL queries. `sparklyr` and `apache.sedona` automatically wrap Spark DataFrames in `dplyr` `tbl`s to work in dplyr workflows. |
| |
| ```{r} |
| data_tbl %>% class() |
| ``` |
| |
| You can get to the underlying Spark DataFrame (SDF) with `sparklyr::spark_dataframe`, to be used for example in with `spakrlyr::invoke` to call SDF methods within Spark. |
| ```{r} |
| sdf <- data_tbl %>% spark_dataframe() |
| sdf |
| ``` |
| |
| |
| |
| |
| # RDD workflows |
| |
| ## What are `SpatialRDD`s? |
| |
| [SpatialRDDs](../../../tutorial/rdd/) are basic building blocks of distributed spatial data in Apache Sedona. |
| A `SpatialRDD` can be partitioned and indexed using well-known spatial |
| data structures to facilitate range queries, KNN queries, and other |
| low-level operations. One can also export records from `SpatialRDD`s |
| into regular Spark DataFrames, making them accessible through Spark SQL |
| and through the `dplyr` interface of `sparklyr`. |
| |
| ## Creating a SpatialRDD |
| |
| NOTE: this section is largely based on |
| [Spatial RDD Scala tutorial](../../../tutorial/rdd/#create-a-spatialrdd), except |
| for examples have been written in R instead of Scala to reflect usages |
| of `apache.sedona`. |
| |
| Currently `SpatialRDD`s can be created in `apache.sedona` by reading a |
| file in a supported geospatial format (`sedona_read_*` functions), or by extracting data from a |
| Spark SQL query. |
| |
| For example, the following code will import data from |
| [arealm-small.csv](https://github.com/apache/sedona/blob/master/docs/usecases/data/arealm-small.csv) |
| into a `SpatialRDD`: |
| |
| ```{r} |
| pt_rdd <- sedona_read_dsv_to_typed_rdd( |
| sc, |
| location = here::here("../spark/common/src/test/resources/arealm.csv"), |
| delimiter = ",", |
| type = "point", |
| first_spatial_col_index = 1, |
| has_non_spatial_attrs = TRUE |
| ) |
| ``` |
| |
| Records from the example |
| [arealm-small.csv](https://github.com/apache/sedona/blob/master/docs/usecases/data/arealm-small.csv) |
| file look like the following: |
| |
| testattribute0,-88.331492,32.324142,testattribute1,testattribute2 |
| testattribute0,-88.175933,32.360763,testattribute1,testattribute2 |
| testattribute0,-88.388954,32.357073,testattribute1,testattribute2 |
| |
| As one can see from the above, each record is comma-separated and |
| consists of a 2-dimensional coordinate starting at the 2nd column and |
| ending at the 3rd column. All other columns contain non-spatial |
| attributes. Because column indexes are 0-based, we need to specify |
| `first_spatial_col_index = 1` in the example above to ensure each record |
| is parsed correctly. |
| |
| In addition to formats such as CSV and TSV, currently `apache.sedona` |
| also supports reading files in WKT (Well-Known Text), WKB (Well-Known |
| Binary), Shapefile and GeoJSON formats. See `sedona_read_wkt()` for details. |
| |
| |
| ## Conversion to and from SpatialRDD |
| One can also run `to_spatial_rdd()` to extract a SpatialRDD from a Spark |
| SQL query, e.g. the query below will extract a spatial column named `"geom"` from the Sedona spatial SQL |
| query above and store it in a `SpatialRDD` object. |
| |
| ```{r} |
| library(dplyr) |
| |
| sdf <- tbl( |
| sc, |
| sql("SELECT ST_GeomFromText('POINT(-71.064544 42.28787)') AS `geom`, \"point\" AS `type`") |
| ) |
| |
| spatial_rdd <- sdf %>% to_spatial_rdd(spatial_col = "geom") |
| spatial_rdd |
| ``` |
| |
| A `SpatialRDD` can be converted into a Spark DataFrame with `sdf_register` (generic method from `sparklyr`). |
| ```{r} |
| spatial_sdf <- spatial_rdd %>% sdf_register(name = "my_table") |
| spatial_sdf |
| ``` |
| |
| |
| # Visualization |
| |
| |
| An important part of `apache.sedona` is its collection of R interfaces |
| to Sedona visualization routines. For example, the following is |
| essentially the R equivalent of [this example in |
| Scala](https://github.com/apache/sedona/blob/f6b1c5e24bdb67d2c8d701a9b2af1fb5658fdc4d/viz/src/main/scala/org/apache/sedona/viz/showcase/ScalaExample.scala#L142-L160). |
| |
| ```{r} |
| resolution_x <- 1000 |
| resolution_y <- 600 |
| boundary <- c(-126.790180, -64.630926, 24.863836, 50.000) |
| |
| pt_rdd <- sedona_read_dsv_to_typed_rdd( |
| sc, |
| location = here::here("../spark/common/src/test/resources/arealm.csv"), |
| type = "point" |
| ) |
| polygon_rdd <- sedona_read_dsv_to_typed_rdd( |
| sc, |
| location = here::here("../spark/common/src/test/resources/primaryroads-polygon.csv"), |
| type = "polygon" |
| ) |
| pair_rdd <- sedona_spatial_join_count_by_key( |
| pt_rdd, |
| polygon_rdd, |
| join_type = "intersect" |
| ) |
| |
| overlay <- sedona_render_scatter_plot( |
| polygon_rdd, |
| resolution_x, |
| resolution_y, |
| output_location = tempfile("scatter-plot-"), |
| boundary = boundary, |
| base_color = c(255, 0, 0), |
| browse = FALSE |
| ) |
| |
| sedona_render_choropleth_map( |
| pair_rdd, |
| resolution_x, |
| resolution_y, |
| output_location = "./choropleth-map", |
| boundary = boundary, |
| overlay = overlay, |
| # vary the green color channel according to relative magnitudes of data points so |
| # that the resulting map will show light blue, light purple, and light gray pixels |
| color_of_variation = "green", |
| base_color = c(225, 225, 255) |
| ) |
| ``` |
| |
| It will create a scatter plot, and then overlay it on top of a |
| choropleth map, as shown below: |
| |
| <img src="choropleth-map.png" width=800 /> |
| |
| See `?apache.sedona::sedona_render_scatter_plot`, |
| `?apache.sedona::sedona_render_heatmap`, and |
| `?apache.sedona::sedona_render_choropleth_map` for more details on |
| visualization-related R interfaces currently implemented by |
| `apache.sedona`. |
| |
| |
| |
| |
| # Advanced parameters |
| Various advanced parameters can be set in Apache Sedona, see [parameters](../../../api/sql/Parameter/) |
| |
| Currently not working through the sparklyr config: |
| ```{r include=FALSE} |
| ## Disconnect first |
| sc %>% spark_disconnect() |
| ``` |
| |
| ```{r} |
| config <- spark_config() |
| config[["sedona.global.index"]] <- FALSE |
| |
| sc <- spark_connect(master = "local", config = config) |
| ``` |
| |
| Check |
| ```{r} |
| invoke_new(sc, "org.apache.sedona.core.utils.SedonaConf", invoke(spark_session(sc), "conf")) |
| ``` |
| (Still true) |
| |
| Or change at runtime: |
| ```{r} |
| spark_session(sc) %>% |
| invoke("conf") %>% |
| invoke("set", "sedona.global.index", "false") |
| |
| invoke_new(sc, "org.apache.sedona.core.utils.SedonaConf", invoke(spark_session(sc), "conf")) |
| ``` |
| |
| ```{r} |
| invoke_new(sc, "org.apache.sedona.core.utils.SedonaConf", invoke(spark_session(sc), "conf")) |
| ``` |
| |
| |
| |
| |
| ```{r include=FALSE} |
| ## Clean-up |
| sc %>% spark_disconnect() |
| spark_disconnect_all() |
| rm(sc) |
| ``` |