| |
| |
| |
| |
| <!DOCTYPE html> |
| <html class="no-js"> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>Spark Declarative Pipelines Programming Guide - Spark 4.1.0-preview1 Documentation</title> |
| |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <link rel="preconnect" href="https://fonts.googleapis.com"> |
| <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> |
| <link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet"> |
| <link href="css/custom.css" rel="stylesheet"> |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| <link rel="stylesheet" href="css/docsearch.min.css" /> |
| <link rel="stylesheet" href="css/docsearch.css"> |
| |
| |
| <!-- Matomo --> |
| <script> |
| var _paq = window._paq = window._paq || []; |
| /* tracker methods like "setCustomDimension" should be called before "trackPageView" */ |
| _paq.push(["disableCookies"]); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="https://analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '40']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script> |
| <!-- End Matomo Code --> |
| |
| |
| </head> |
| <body class="global"> |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| <nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar"> |
| <div class="navbar-brand"><a href="index.html"> |
| <img src="https://spark.apache.org/images/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">4.1.0-preview1</span> |
| </div> |
| <button class="navbar-toggler" type="button" data-toggle="collapse" |
| data-target="#navbarCollapse" aria-controls="navbarCollapse" |
| aria-expanded="false" aria-label="Toggle navigation"> |
| <span class="navbar-toggler-icon"></span> |
| </button> |
| <div class="collapse navbar-collapse" id="navbarCollapse"> |
| <ul class="navbar-nav me-auto"> |
| <li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a> |
| <div class="dropdown-menu" aria-labelledby="navbarQuickStart"> |
| <a class="dropdown-item" href="quick-start.html">Quick Start</a> |
| <a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a> |
| <a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a> |
| <a class="dropdown-item" href="streaming/index.html">Structured Streaming</a> |
| <a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a> |
| <a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a> |
| <a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a> |
| <a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a> |
| <a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a> |
| <a class="dropdown-item" href="declarative-pipelines-programming-guide.html">Declarative Pipelines</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a> |
| <div class="dropdown-menu" aria-labelledby="navbarAPIDocs"> |
| <a class="dropdown-item" href="api/python/index.html">Python</a> |
| <a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a> |
| <a class="dropdown-item" href="api/java/index.html">Java</a> |
| <a class="dropdown-item" href="api/R/index.html">R</a> |
| <a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a> |
| <div class="dropdown-menu" aria-labelledby="navbarDeploying"> |
| <a class="dropdown-item" href="cluster-overview.html">Overview</a> |
| <a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a> |
| <a class="dropdown-item" href="running-on-yarn.html">YARN</a> |
| <a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a> |
| <div class="dropdown-menu" aria-labelledby="navbarMore"> |
| <a class="dropdown-item" href="configuration.html">Configuration</a> |
| <a class="dropdown-item" href="monitoring.html">Monitoring</a> |
| <a class="dropdown-item" href="tuning.html">Tuning Guide</a> |
| <a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a> |
| <a class="dropdown-item" href="security.html">Security</a> |
| <a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a> |
| <a class="dropdown-item" href="migration-guide.html">Migration Guide</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="building-spark.html">Building Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a> |
| </div> |
| </li> |
| |
| <li class="nav-item"> |
| <input type="text" id="docsearch-input" placeholder="Search the docs…"> |
| </li> |
| </ul> |
| <!--<span class="navbar-text navbar-right"><span class="version-text">v4.1.0-preview1</span></span>--> |
| </div> |
| </nav> |
| |
| |
| |
| <div class="container"> |
| |
| <div class="content mr-3" id="content"> |
| |
| |
| <h1 class="title">Spark Declarative Pipelines Programming Guide</h1> |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#what-is-spark-declarative-pipelines-sdp" id="markdown-toc-what-is-spark-declarative-pipelines-sdp">What is Spark Declarative Pipelines (SDP)?</a></li> |
| <li><a href="#key-concepts" id="markdown-toc-key-concepts">Key Concepts</a> <ul> |
| <li><a href="#flows" id="markdown-toc-flows">Flows</a></li> |
| <li><a href="#datasets" id="markdown-toc-datasets">Datasets</a></li> |
| <li><a href="#pipelines" id="markdown-toc-pipelines">Pipelines</a></li> |
| <li><a href="#pipeline-projects" id="markdown-toc-pipeline-projects">Pipeline Projects</a></li> |
| </ul> |
| </li> |
| <li><a href="#the-spark-pipelines-command-line-interface" id="markdown-toc-the-spark-pipelines-command-line-interface">The <code class="language-plaintext highlighter-rouge">spark-pipelines</code> Command Line Interface</a> <ul> |
| <li><a href="#spark-pipelines-init" id="markdown-toc-spark-pipelines-init"><code class="language-plaintext highlighter-rouge">spark-pipelines init</code></a></li> |
| <li><a href="#spark-pipelines-run" id="markdown-toc-spark-pipelines-run"><code class="language-plaintext highlighter-rouge">spark-pipelines run</code></a></li> |
| </ul> |
| </li> |
| <li><a href="#programming-with-sdp-in-python" id="markdown-toc-programming-with-sdp-in-python">Programming with SDP in Python</a> <ul> |
| <li><a href="#creating-a-materialized-view-with-python" id="markdown-toc-creating-a-materialized-view-with-python">Creating a Materialized View with Python</a></li> |
| <li><a href="#creating-a-temporary-view-with-python" id="markdown-toc-creating-a-temporary-view-with-python">Creating a Temporary View with Python</a></li> |
| <li><a href="#creating-a-streaming-table-with-python" id="markdown-toc-creating-a-streaming-table-with-python">Creating a Streaming Table with Python</a></li> |
| <li><a href="#loading-data-from-a-streaming-source" id="markdown-toc-loading-data-from-a-streaming-source">Loading Data from a Streaming Source</a></li> |
| <li><a href="#querying-tables-defined-in-your-pipeline" id="markdown-toc-querying-tables-defined-in-your-pipeline">Querying Tables Defined in Your Pipeline</a></li> |
| <li><a href="#creating-tables-in-a-for-loop" id="markdown-toc-creating-tables-in-a-for-loop">Creating Tables in a For Loop</a></li> |
| <li><a href="#using-multiple-flows-to-write-to-a-single-target" id="markdown-toc-using-multiple-flows-to-write-to-a-single-target">Using Multiple Flows to Write to a Single Target</a></li> |
| </ul> |
| </li> |
| <li><a href="#programming-with-sdp-in-sql" id="markdown-toc-programming-with-sdp-in-sql">Programming with SDP in SQL</a> <ul> |
| <li><a href="#creating-a-materialized-view-with-sql" id="markdown-toc-creating-a-materialized-view-with-sql">Creating a Materialized View with SQL</a></li> |
| <li><a href="#creating-a-temporary-view-with-sql" id="markdown-toc-creating-a-temporary-view-with-sql">Creating a Temporary View with SQL</a></li> |
| <li><a href="#creating-a-streaming-table-with-sql" id="markdown-toc-creating-a-streaming-table-with-sql">Creating a Streaming Table with SQL</a></li> |
| <li><a href="#querying-tables-defined-in-your-pipeline-1" id="markdown-toc-querying-tables-defined-in-your-pipeline-1">Querying Tables Defined in Your Pipeline</a></li> |
| <li><a href="#using-multiple-flows-to-write-to-a-single-target-1" id="markdown-toc-using-multiple-flows-to-write-to-a-single-target-1">Using Multiple Flows to Write to a Single Target</a></li> |
| </ul> |
| </li> |
| <li><a href="#important-considerations" id="markdown-toc-important-considerations">Important Considerations</a> <ul> |
| <li><a href="#python-considerations" id="markdown-toc-python-considerations">Python Considerations</a></li> |
| <li><a href="#sql-considerations" id="markdown-toc-sql-considerations">SQL Considerations</a></li> |
| </ul> |
| </li> |
| </ul> |
| |
| <h2 id="what-is-spark-declarative-pipelines-sdp">What is Spark Declarative Pipelines (SDP)?</h2> |
| |
| <p>Spark Declarative Pipelines (SDP) is a declarative framework for building reliable, maintainable, and testable data pipelines on Spark. SDP simplifies ETL development by allowing you to focus on the transformations you want to apply to your data, rather than the mechanics of pipeline execution.</p> |
| |
| <p>SDP is designed for both batch and streaming data processing, supporting common use cases such as:</p> |
| <ul> |
| <li>Data ingestion from cloud storage (Amazon S3, Azure ADLS Gen2, Google Cloud Storage)</li> |
| <li>Data ingestion from message buses (Apache Kafka, Amazon Kinesis, Google Pub/Sub, Azure EventHub)</li> |
| <li>Incremental batch and streaming transformations</li> |
| </ul> |
| |
| <p>The key advantage of SDP is its declarative approach - you define what tables should exist and what their contents should be, and SDP handles the orchestration, compute management, and error handling automatically.</p> |
| |
| <p><img src="../img/declarative-pipelines-dataflow-graph.png" alt="Dataflow Graph" /></p> |
| |
| <h2 id="key-concepts">Key Concepts</h2> |
| |
| <h3 id="flows">Flows</h3> |
| |
| <p>A flow is the foundational data processing concept in SDP which supports both streaming and batch semantics. A flow reads data from a source, applies user-defined processing logic, and writes the result into a target dataset.</p> |
| |
| <p>For example, when you author a query like:</p> |
| |
| <div class="language-sql highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">CREATE</span> <span class="n">STREAMING</span> <span class="k">TABLE</span> <span class="n">target_table</span> <span class="k">AS</span> |
| <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">STREAM</span> <span class="n">source_table</span> |
| </code></pre></div></div> |
| |
| <p>SDP creates the table named <code class="language-plaintext highlighter-rouge">target_table</code> along with a flow that reads new data from <code class="language-plaintext highlighter-rouge">source_table</code> and writes it to <code class="language-plaintext highlighter-rouge">target_table</code>.</p> |
| |
| <h3 id="datasets">Datasets</h3> |
| |
| <p>A dataset is queryable object that’s the output of one of more flows within a pipeline. Flows in the pipeline can also read from datasets produced in the pipeline.</p> |
| |
| <ul> |
| <li><strong>Streaming Table</strong> – a definition of a table and one or more streaming flows written into it. Streaming tables support incremental processing of data, allowing you to process only new data as it arrives.</li> |
| <li><strong>Materialized View</strong> – is a view that is precomputed into a table. A materialized view always has exactly one batch flow writing to it.</li> |
| <li><strong>Temporary View</strong> – a view that is scoped to an execution of the pipeline. It can be referenced from flows within the pipeline. It’s useful for encapsulating transformations and intermediate logical entities that multiple other elements of the pipeline depend on.</li> |
| </ul> |
| |
| <h3 id="pipelines">Pipelines</h3> |
| |
| <p>A pipeline is the primary unit of development and execution in SDP. A pipeline can contain one or more flows, streaming tables, and materialized views. While your pipeline runs, it analyzes the dependencies of your defined objects and orchestrates their order of execution and parallelization automatically.</p> |
| |
| <h3 id="pipeline-projects">Pipeline Projects</h3> |
| |
| <p>A pipeline project is a set of source files that contain code that define the datasets and flows that make up a pipeline. These source files can be <code class="language-plaintext highlighter-rouge">.py</code> or <code class="language-plaintext highlighter-rouge">.sql</code> files.</p> |
| |
| <p>A YAML-formatted pipeline spec file contains the top-level configuration for the pipeline project. It supports the following fields:</p> |
| <ul> |
| <li><strong>definitions</strong> (Required) - Paths where definition files can be found.</li> |
| <li><strong>database</strong> (Optional) - The default target database for pipeline outputs.</li> |
| <li><strong>catalog</strong> (Optional) - The default target catalog for pipeline outputs.</li> |
| <li><strong>configuration</strong> (Optional) - Map of Spark configuration properties.</li> |
| </ul> |
| |
| <p>An example pipeline spec file:</p> |
| |
| <div class="language-yaml highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="na">name</span><span class="pi">:</span> <span class="s">my_pipeline</span> |
| <span class="na">definitions</span><span class="pi">:</span> |
| <span class="pi">-</span> <span class="na">glob</span><span class="pi">:</span> |
| <span class="na">include</span><span class="pi">:</span> <span class="s">transformations/**/*.py</span> |
| <span class="pi">-</span> <span class="na">glob</span><span class="pi">:</span> |
| <span class="na">include</span><span class="pi">:</span> <span class="s">transformations/**/*.sql</span> |
| <span class="na">catalog</span><span class="pi">:</span> <span class="s">my_catalog</span> |
| <span class="na">database</span><span class="pi">:</span> <span class="s">my_db</span> |
| <span class="na">configuration</span><span class="pi">:</span> |
| <span class="na">spark.sql.shuffle.partitions</span><span class="pi">:</span> <span class="s2">"</span><span class="s">1000"</span> |
| </code></pre></div></div> |
| |
| <p>It’s conventional to name pipeline spec files <code class="language-plaintext highlighter-rouge">pipeline.yml</code>.</p> |
| |
| <p>The <code class="language-plaintext highlighter-rouge">spark-pipelines init</code> command, described below, makes it easy to generate a pipeline project with default configuration and directory structure.</p> |
| |
| <h2 id="the-spark-pipelines-command-line-interface">The <code class="language-plaintext highlighter-rouge">spark-pipelines</code> Command Line Interface</h2> |
| |
| <p>The <code class="language-plaintext highlighter-rouge">spark-pipelines</code> command line interface (CLI) is the primary way to execute a pipeline. It also contains an <code class="language-plaintext highlighter-rouge">init</code> subcommand for generating a pipeline project.</p> |
| |
| <p><code class="language-plaintext highlighter-rouge">spark-pipelines</code> is built on top of <code class="language-plaintext highlighter-rouge">spark-submit</code>, meaning that it supports all cluster managers supported by <code class="language-plaintext highlighter-rouge">spark-submit</code>. It supports all <code class="language-plaintext highlighter-rouge">spark-submit</code> arguments except for <code class="language-plaintext highlighter-rouge">--class</code>.</p> |
| |
| <h3 id="spark-pipelines-init"><code class="language-plaintext highlighter-rouge">spark-pipelines init</code></h3> |
| |
| <p><code class="language-plaintext highlighter-rouge">spark-pipelines init --name my_pipeline</code> generates a simple pipeline project, inside a directory named “my_pipeline”, including a spec file and example definitions.</p> |
| |
| <h3 id="spark-pipelines-run"><code class="language-plaintext highlighter-rouge">spark-pipelines run</code></h3> |
| |
| <p><code class="language-plaintext highlighter-rouge">spark-pipelines run</code> launches an execution of a pipeline and monitors its progress until it completes. The <code class="language-plaintext highlighter-rouge">--spec</code> parameter allows selecting the pipeline spec file. If not provided, the CLI will look in the current directory and parent directories for a file named <code class="language-plaintext highlighter-rouge">pipeline.yml</code> or <code class="language-plaintext highlighter-rouge">pipeline.yaml</code>.</p> |
| |
| <h2 id="programming-with-sdp-in-python">Programming with SDP in Python</h2> |
| |
| <p>SDP Python functions are defined in the <code class="language-plaintext highlighter-rouge">pyspark.pipelines</code> module. Your pipelines implemented with the Python API must import this module. It’s common to alias the module to <code class="language-plaintext highlighter-rouge">sdp</code> to limit the number of characters you need to type when using its APIs.</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| </code></pre></div></div> |
| |
| <h3 id="creating-a-materialized-view-with-python">Creating a Materialized View with Python</h3> |
| |
| <p>The <code class="language-plaintext highlighter-rouge">@sdp.materialized_view</code> decorator tells SDP to create a materialized view based on the results returned by a function that performs a batch read:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| |
| <span class="nd">@sdp.materialized_view</span> |
| <span class="k">def</span> <span class="nf">basic_mv</span><span class="p">():</span> |
| <span class="k">return</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">samples.nyctaxi.trips</span><span class="sh">"</span><span class="p">)</span> |
| </code></pre></div></div> |
| |
| <p>Optionally, you can specify the table name using the <code class="language-plaintext highlighter-rouge">name</code> argument:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| |
| <span class="nd">@sdp.materialized_view</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="sh">"</span><span class="s">trips_mv</span><span class="sh">"</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">basic_mv</span><span class="p">():</span> |
| <span class="k">return</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">samples.nyctaxi.trips</span><span class="sh">"</span><span class="p">)</span> |
| </code></pre></div></div> |
| |
| <h3 id="creating-a-temporary-view-with-python">Creating a Temporary View with Python</h3> |
| |
| <p>The <code class="language-plaintext highlighter-rouge">@sdp.temporary_view</code> decorator tells SDP to create a temporary view based on the results returned by a function that performs a batch read:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| |
| <span class="nd">@sdp.temporary_view</span> |
| <span class="k">def</span> <span class="nf">basic_tv</span><span class="p">():</span> |
| <span class="k">return</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">samples.nyctaxi.trips</span><span class="sh">"</span><span class="p">)</span> |
| </code></pre></div></div> |
| |
| <p>This temporary view can be read by other queries within the pipeline, but can’t be read outside the scope of the pipeline.</p> |
| |
| <h3 id="creating-a-streaming-table-with-python">Creating a Streaming Table with Python</h3> |
| |
| <p>Similarly, you can create a streaming table by using the <code class="language-plaintext highlighter-rouge">@sdp.table</code> decorator with a function that performs a streaming read:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| |
| <span class="nd">@sdp.table</span> |
| <span class="k">def</span> <span class="nf">basic_st</span><span class="p">():</span> |
| <span class="k">return</span> <span class="n">spark</span><span class="p">.</span><span class="n">readStream</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">samples.nyctaxi.trips</span><span class="sh">"</span><span class="p">)</span> |
| </code></pre></div></div> |
| |
| <h3 id="loading-data-from-a-streaming-source">Loading Data from a Streaming Source</h3> |
| |
| <p>SDP supports loading data from all formats supported by Spark. For example, you can create a streaming table whose query reads from a Kafka topic:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| |
| <span class="nd">@sdp.table</span> |
| <span class="k">def</span> <span class="nf">ingestion_st</span><span class="p">():</span> |
| <span class="nf">return </span><span class="p">(</span> |
| <span class="n">spark</span><span class="p">.</span><span class="n">readStream</span><span class="p">.</span><span class="nf">format</span><span class="p">(</span><span class="sh">"</span><span class="s">kafka</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">option</span><span class="p">(</span><span class="sh">"</span><span class="s">kafka.bootstrap.servers</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">localhost:9092</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">option</span><span class="p">(</span><span class="sh">"</span><span class="s">subscribe</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">orders</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">load</span><span class="p">()</span> |
| <span class="p">)</span> |
| </code></pre></div></div> |
| |
| <p>For batch reads:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| |
| <span class="nd">@sdp.materialized_view</span> |
| <span class="k">def</span> <span class="nf">batch_mv</span><span class="p">():</span> |
| <span class="k">return</span> <span class="n">spark</span><span class="p">.</span><span class="n">read</span><span class="p">.</span><span class="nf">format</span><span class="p">(</span><span class="sh">"</span><span class="s">json</span><span class="sh">"</span><span class="p">).</span><span class="nf">load</span><span class="p">(</span><span class="sh">"</span><span class="s">/datasets/retail-org/sales_orders</span><span class="sh">"</span><span class="p">)</span> |
| </code></pre></div></div> |
| |
| <h3 id="querying-tables-defined-in-your-pipeline">Querying Tables Defined in Your Pipeline</h3> |
| |
| <p>You can reference other tables defined in your pipeline in the same way you’d reference tables defined outside your pipeline:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| <span class="kn">from</span> <span class="n">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">col</span> |
| |
| <span class="nd">@sdp.table</span> |
| <span class="k">def</span> <span class="nf">orders</span><span class="p">():</span> |
| <span class="nf">return </span><span class="p">(</span> |
| <span class="n">spark</span><span class="p">.</span><span class="n">readStream</span><span class="p">.</span><span class="nf">format</span><span class="p">(</span><span class="sh">"</span><span class="s">kafka</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">option</span><span class="p">(</span><span class="sh">"</span><span class="s">kafka.bootstrap.servers</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">localhost:9092</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">option</span><span class="p">(</span><span class="sh">"</span><span class="s">subscribe</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">orders</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">load</span><span class="p">()</span> |
| <span class="p">)</span> |
| |
| <span class="nd">@sdp.materialized_view</span> |
| <span class="k">def</span> <span class="nf">customers</span><span class="p">():</span> |
| <span class="k">return</span> <span class="n">spark</span><span class="p">.</span><span class="n">read</span><span class="p">.</span><span class="nf">format</span><span class="p">(</span><span class="sh">"</span><span class="s">csv</span><span class="sh">"</span><span class="p">).</span><span class="nf">option</span><span class="p">(</span><span class="sh">"</span><span class="s">header</span><span class="sh">"</span><span class="p">,</span> <span class="bp">True</span><span class="p">).</span><span class="nf">load</span><span class="p">(</span><span class="sh">"</span><span class="s">/datasets/retail-org/customers</span><span class="sh">"</span><span class="p">)</span> |
| |
| <span class="nd">@sdp.materialized_view</span> |
| <span class="k">def</span> <span class="nf">customer_orders</span><span class="p">():</span> |
| <span class="nf">return </span><span class="p">(</span><span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">orders</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">join</span><span class="p">(</span><span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">customers</span><span class="sh">"</span><span class="p">),</span> <span class="sh">"</span><span class="s">customer_id</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">select</span><span class="p">(</span><span class="sh">"</span><span class="s">customer_id</span><span class="sh">"</span><span class="p">,</span> |
| <span class="sh">"</span><span class="s">order_number</span><span class="sh">"</span><span class="p">,</span> |
| <span class="sh">"</span><span class="s">state</span><span class="sh">"</span><span class="p">,</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">order_datetime</span><span class="sh">"</span><span class="p">).</span><span class="nf">cast</span><span class="p">(</span><span class="sh">"</span><span class="s">int</span><span class="sh">"</span><span class="p">).</span><span class="nf">cast</span><span class="p">(</span><span class="sh">"</span><span class="s">timestamp</span><span class="sh">"</span><span class="p">).</span><span class="nf">cast</span><span class="p">(</span><span class="sh">"</span><span class="s">date</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">order_date</span><span class="sh">"</span><span class="p">),</span> |
| <span class="p">)</span> |
| <span class="p">)</span> |
| |
| <span class="nd">@sdp.materialized_view</span> |
| <span class="k">def</span> <span class="nf">daily_orders_by_state</span><span class="p">():</span> |
| <span class="nf">return </span><span class="p">(</span><span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">customer_orders</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">groupBy</span><span class="p">(</span><span class="sh">"</span><span class="s">state</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">order_date</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">count</span><span class="p">().</span><span class="nf">withColumnRenamed</span><span class="p">(</span><span class="sh">"</span><span class="s">count</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">order_count</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">)</span> |
| </code></pre></div></div> |
| |
| <h3 id="creating-tables-in-a-for-loop">Creating Tables in a For Loop</h3> |
| |
| <p>You can use Python <code class="language-plaintext highlighter-rouge">for</code> loops to create multiple tables programmatically:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| <span class="kn">from</span> <span class="n">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">collect_list</span><span class="p">,</span> <span class="n">col</span> |
| |
| <span class="nd">@sdp.temporary_view</span><span class="p">()</span> |
| <span class="k">def</span> <span class="nf">customer_orders</span><span class="p">():</span> |
| <span class="n">orders</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">samples.tpch.orders</span><span class="sh">"</span><span class="p">)</span> |
| <span class="n">customer</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">samples.tpch.customer</span><span class="sh">"</span><span class="p">)</span> |
| |
| <span class="nf">return </span><span class="p">(</span><span class="n">orders</span><span class="p">.</span><span class="nf">join</span><span class="p">(</span><span class="n">customer</span><span class="p">,</span> <span class="n">orders</span><span class="p">.</span><span class="n">o_custkey</span> <span class="o">==</span> <span class="n">customer</span><span class="p">.</span><span class="n">c_custkey</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">select</span><span class="p">(</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">c_custkey</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">custkey</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">c_name</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">name</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">c_nationkey</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">nationkey</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">c_phone</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">phone</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">o_orderkey</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">orderkey</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">o_orderstatus</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">orderstatus</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">o_totalprice</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">totalprice</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">o_orderdate</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">orderdate</span><span class="sh">"</span><span class="p">))</span> |
| <span class="p">)</span> |
| |
| <span class="nd">@sdp.temporary_view</span><span class="p">()</span> |
| <span class="k">def</span> <span class="nf">nation_region</span><span class="p">():</span> |
| <span class="n">nation</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">samples.tpch.nation</span><span class="sh">"</span><span class="p">)</span> |
| <span class="n">region</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">samples.tpch.region</span><span class="sh">"</span><span class="p">)</span> |
| |
| <span class="nf">return </span><span class="p">(</span><span class="n">nation</span><span class="p">.</span><span class="nf">join</span><span class="p">(</span><span class="n">region</span><span class="p">,</span> <span class="n">nation</span><span class="p">.</span><span class="n">n_regionkey</span> <span class="o">==</span> <span class="n">region</span><span class="p">.</span><span class="n">r_regionkey</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">select</span><span class="p">(</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">n_name</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">nation</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">r_name</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">region</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">n_nationkey</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">nationkey</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">)</span> |
| <span class="p">)</span> |
| |
| <span class="c1"># Extract region names from region table |
| </span><span class="n">region_list</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">samples.tpch.region</span><span class="sh">"</span><span class="p">).</span><span class="nf">select</span><span class="p">(</span><span class="nf">collect_list</span><span class="p">(</span><span class="sh">"</span><span class="s">r_name</span><span class="sh">"</span><span class="p">)).</span><span class="nf">collect</span><span class="p">()[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span> |
| |
| <span class="c1"># Iterate through region names to create new region-specific materialized views |
| </span><span class="k">for</span> <span class="n">region</span> <span class="ow">in</span> <span class="n">region_list</span><span class="p">:</span> |
| <span class="nd">@sdp.table</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="sa">f</span><span class="sh">"</span><span class="si">{</span><span class="n">region</span><span class="p">.</span><span class="nf">lower</span><span class="p">().</span><span class="nf">replace</span><span class="p">(</span><span class="sh">'</span><span class="s"> </span><span class="sh">'</span><span class="p">,</span> <span class="sh">'</span><span class="s">_</span><span class="sh">'</span><span class="p">)</span><span class="si">}</span><span class="s">_customer_orders</span><span class="sh">"</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">regional_customer_orders</span><span class="p">(</span><span class="n">region_filter</span><span class="o">=</span><span class="n">region</span><span class="p">):</span> |
| <span class="n">customer_orders</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">customer_orders</span><span class="sh">"</span><span class="p">)</span> |
| <span class="n">nation_region</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">nation_region</span><span class="sh">"</span><span class="p">)</span> |
| |
| <span class="nf">return </span><span class="p">(</span><span class="n">customer_orders</span><span class="p">.</span><span class="nf">join</span><span class="p">(</span><span class="n">nation_region</span><span class="p">,</span> <span class="n">customer_orders</span><span class="p">.</span><span class="n">nationkey</span> <span class="o">==</span> <span class="n">nation_region</span><span class="p">.</span><span class="n">nationkey</span><span class="p">)</span> |
| <span class="p">.</span><span class="nf">select</span><span class="p">(</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">custkey</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">name</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">phone</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">nation</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">region</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">orderkey</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">orderstatus</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">totalprice</span><span class="sh">"</span><span class="p">),</span> |
| <span class="nf">col</span><span class="p">(</span><span class="sh">"</span><span class="s">orderdate</span><span class="sh">"</span><span class="p">)</span> |
| <span class="p">).</span><span class="nf">filter</span><span class="p">(</span><span class="sa">f</span><span class="sh">"</span><span class="s">region = </span><span class="sh">'</span><span class="si">{</span><span class="n">region_filter</span><span class="si">}</span><span class="sh">'"</span><span class="p">)</span> |
| <span class="p">)</span> |
| </code></pre></div></div> |
| |
| <h3 id="using-multiple-flows-to-write-to-a-single-target">Using Multiple Flows to Write to a Single Target</h3> |
| |
| <p>You can create multiple flows that append data to the same target:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="n">pyspark</span> <span class="kn">import</span> <span class="n">pipelines</span> <span class="k">as</span> <span class="n">sdp</span> |
| |
| <span class="c1"># create a streaming table |
| </span><span class="n">sdp</span><span class="p">.</span><span class="nf">create_streaming_table</span><span class="p">(</span><span class="sh">"</span><span class="s">customers_us</span><span class="sh">"</span><span class="p">)</span> |
| |
| <span class="c1"># add the first append flow |
| </span><span class="nd">@sdp.append_flow</span><span class="p">(</span><span class="n">target</span> <span class="o">=</span> <span class="sh">"</span><span class="s">customers_us</span><span class="sh">"</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">append1</span><span class="p">():</span> |
| <span class="k">return</span> <span class="n">spark</span><span class="p">.</span><span class="n">readStream</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">customers_us_west</span><span class="sh">"</span><span class="p">)</span> |
| |
| <span class="c1"># add the second append flow |
| </span><span class="nd">@sdp.append_flow</span><span class="p">(</span><span class="n">target</span> <span class="o">=</span> <span class="sh">"</span><span class="s">customers_us</span><span class="sh">"</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">append2</span><span class="p">():</span> |
| <span class="k">return</span> <span class="n">spark</span><span class="p">.</span><span class="n">readStream</span><span class="p">.</span><span class="nf">table</span><span class="p">(</span><span class="sh">"</span><span class="s">customers_us_east</span><span class="sh">"</span><span class="p">)</span> |
| </code></pre></div></div> |
| |
| <h2 id="programming-with-sdp-in-sql">Programming with SDP in SQL</h2> |
| |
| <h3 id="creating-a-materialized-view-with-sql">Creating a Materialized View with SQL</h3> |
| |
| <p>The basic syntax for creating a materialized view with SQL is:</p> |
| |
| <div class="language-sql highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">CREATE</span> <span class="n">MATERIALIZED</span> <span class="k">VIEW</span> <span class="n">basic_mv</span> |
| <span class="k">AS</span> <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">samples</span><span class="p">.</span><span class="n">nyctaxi</span><span class="p">.</span><span class="n">trips</span><span class="p">;</span> |
| </code></pre></div></div> |
| |
| <h3 id="creating-a-temporary-view-with-sql">Creating a Temporary View with SQL</h3> |
| |
| <p>The basic syntax for creating a temporary view with SQL is:</p> |
| |
| <div class="language-sql highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">CREATE</span> <span class="k">TEMPORARY</span> <span class="k">VIEW</span> <span class="n">basic_tv</span> |
| <span class="k">AS</span> <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">samples</span><span class="p">.</span><span class="n">nyctaxi</span><span class="p">.</span><span class="n">trips</span><span class="p">;</span> |
| </code></pre></div></div> |
| |
| <h3 id="creating-a-streaming-table-with-sql">Creating a Streaming Table with SQL</h3> |
| |
| <p>When creating a streaming table, use the <code class="language-plaintext highlighter-rouge">STREAM</code> keyword to indicate streaming semantics for the source:</p> |
| |
| <div class="language-sql highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">CREATE</span> <span class="n">STREAMING</span> <span class="k">TABLE</span> <span class="n">basic_st</span> |
| <span class="k">AS</span> <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">STREAM</span> <span class="n">samples</span><span class="p">.</span><span class="n">nyctaxi</span><span class="p">.</span><span class="n">trips</span><span class="p">;</span> |
| </code></pre></div></div> |
| |
| <h3 id="querying-tables-defined-in-your-pipeline-1">Querying Tables Defined in Your Pipeline</h3> |
| |
| <p>You can reference other tables defined in your pipeline:</p> |
| |
| <div class="language-sql highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">CREATE</span> <span class="n">STREAMING</span> <span class="k">TABLE</span> <span class="n">orders</span> |
| <span class="k">AS</span> <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">STREAM</span> <span class="n">orders_source</span><span class="p">;</span> |
| |
| <span class="k">CREATE</span> <span class="n">MATERIALIZED</span> <span class="k">VIEW</span> <span class="n">customers</span> |
| <span class="k">AS</span> <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">customers_source</span><span class="p">;</span> |
| |
| <span class="k">CREATE</span> <span class="n">MATERIALIZED</span> <span class="k">VIEW</span> <span class="n">customer_orders</span> |
| <span class="k">AS</span> <span class="k">SELECT</span> |
| <span class="k">c</span><span class="p">.</span><span class="n">customer_id</span><span class="p">,</span> |
| <span class="n">o</span><span class="p">.</span><span class="n">order_number</span><span class="p">,</span> |
| <span class="k">c</span><span class="p">.</span><span class="k">state</span><span class="p">,</span> |
| <span class="nb">date</span><span class="p">(</span><span class="nb">timestamp</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">o</span><span class="p">.</span><span class="n">order_datetime</span><span class="p">)))</span> <span class="n">order_date</span> |
| <span class="k">FROM</span> <span class="n">orders</span> <span class="n">o</span> |
| <span class="k">INNER</span> <span class="k">JOIN</span> <span class="n">customers</span> <span class="k">c</span> |
| <span class="k">ON</span> <span class="n">o</span><span class="p">.</span><span class="n">customer_id</span> <span class="o">=</span> <span class="k">c</span><span class="p">.</span><span class="n">customer_id</span><span class="p">;</span> |
| |
| <span class="k">CREATE</span> <span class="n">MATERIALIZED</span> <span class="k">VIEW</span> <span class="n">daily_orders_by_state</span> |
| <span class="k">AS</span> <span class="k">SELECT</span> <span class="k">state</span><span class="p">,</span> <span class="n">order_date</span><span class="p">,</span> <span class="k">count</span><span class="p">(</span><span class="o">*</span><span class="p">)</span> <span class="n">order_count</span> |
| <span class="k">FROM</span> <span class="n">customer_orders</span> |
| <span class="k">GROUP</span> <span class="k">BY</span> <span class="k">state</span><span class="p">,</span> <span class="n">order_date</span><span class="p">;</span> |
| </code></pre></div></div> |
| |
| <h3 id="using-multiple-flows-to-write-to-a-single-target-1">Using Multiple Flows to Write to a Single Target</h3> |
| |
| <p>You can create multiple flows that append data to the same target:</p> |
| |
| <div class="language-sql highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="c1">-- create a streaming table</span> |
| <span class="k">CREATE</span> <span class="n">STREAMING</span> <span class="k">TABLE</span> <span class="n">customers_us</span><span class="p">;</span> |
| |
| <span class="c1">-- add the first append flow</span> |
| <span class="k">CREATE</span> <span class="n">FLOW</span> <span class="n">append1</span> |
| <span class="k">AS</span> <span class="k">INSERT</span> <span class="k">INTO</span> <span class="n">customers_us</span> |
| <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">STREAM</span><span class="p">(</span><span class="n">customers_us_west</span><span class="p">);</span> |
| |
| <span class="c1">-- add the second append flow</span> |
| <span class="k">CREATE</span> <span class="n">FLOW</span> <span class="n">append2</span> |
| <span class="k">AS</span> <span class="k">INSERT</span> <span class="k">INTO</span> <span class="n">customers_us</span> |
| <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">STREAM</span><span class="p">(</span><span class="n">customers_us_east</span><span class="p">);</span> |
| </code></pre></div></div> |
| |
| <h2 id="important-considerations">Important Considerations</h2> |
| |
| <h3 id="python-considerations">Python Considerations</h3> |
| |
| <ul> |
| <li>SDP evaluates the code that defines a pipeline multiple times during planning and pipeline runs. Python functions that define datasets should include only the code required to define the table or view.</li> |
| <li>The function used to define a dataset must return a Spark DataFrame.</li> |
| <li>Never use methods that save or write to files or tables as part of your SDP dataset code.</li> |
| </ul> |
| |
| <p>Examples of Apache Spark operations that should never be used in SDP code:</p> |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">collect()</code></li> |
| <li><code class="language-plaintext highlighter-rouge">count()</code></li> |
| <li><code class="language-plaintext highlighter-rouge">toPandas()</code></li> |
| <li><code class="language-plaintext highlighter-rouge">save()</code></li> |
| <li><code class="language-plaintext highlighter-rouge">saveAsTable()</code></li> |
| <li><code class="language-plaintext highlighter-rouge">start()</code></li> |
| <li><code class="language-plaintext highlighter-rouge">toTable()</code></li> |
| </ul> |
| |
| <h3 id="sql-considerations">SQL Considerations</h3> |
| |
| <ul> |
| <li>The <code class="language-plaintext highlighter-rouge">PIVOT</code> clause is not supported in SDP SQL.</li> |
| <li>When using the <code class="language-plaintext highlighter-rouge">for</code> loop pattern to define datasets in Python, ensure that the list of values passed to the <code class="language-plaintext highlighter-rouge">for</code> loop is always additive.</li> |
| </ul> |
| |
| </div> |
| |
| <!-- /container --> |
| </div> |
| |
| <script src="js/vendor/jquery-3.5.1.min.js"></script> |
| <script src="js/vendor/bootstrap.bundle.min.js"></script> |
| |
| <script src="js/vendor/anchor.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <script type="text/javascript" src="js/vendor/docsearch.min.js"></script> |
| <script type="text/javascript"> |
| // DocSearch is entirely free and automated. DocSearch is built in two parts: |
| // 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link |
| // in your website and extract content from every page it traverses. It then pushes this |
| // content to an Algolia index. |
| // 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index |
| // to your search input and display its results in a dropdown UI. If you want to find more |
| // details on how works DocSearch, check the docs of DocSearch. |
| docsearch({ |
| apiKey: 'd62f962a82bc9abb53471cb7b89da35e', |
| appId: 'RAI69RXRSK', |
| indexName: 'apache_spark', |
| inputSelector: '#docsearch-input', |
| enhancedSearchInput: true, |
| algoliaOptions: { |
| 'facetFilters': ["version:4.1.0-preview1"] |
| }, |
| debug: false // Set debug to true if you want to inspect the dropdown |
| }); |
| |
| </script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' + |
| '?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |