| |
| <!DOCTYPE html> |
| |
| <html> |
| <head> |
| <meta charset="utf-8" /> |
| <title>pyspark.sql.streaming — PySpark 3.3.1 documentation</title> |
| |
| <link rel="stylesheet" href="../../../_static/css/index.73d71520a4ca3b99cfee5594769eaaae.css"> |
| |
| |
| <link rel="stylesheet" |
| href="../../../_static/vendor/fontawesome/5.13.0/css/all.min.css"> |
| <link rel="preload" as="font" type="font/woff2" crossorigin |
| href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2"> |
| <link rel="preload" as="font" type="font/woff2" crossorigin |
| href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2"> |
| |
| |
| |
| <link rel="stylesheet" |
| href="../../../_static/vendor/open-sans_all/1.44.1/index.css"> |
| <link rel="stylesheet" |
| href="../../../_static/vendor/lato_latin-ext/1.44.1/index.css"> |
| |
| |
| <link rel="stylesheet" href="../../../_static/basic.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" /> |
| <link rel="stylesheet" type="text/css" href="../../../_static/css/pyspark.css" /> |
| |
| <link rel="preload" as="script" href="../../../_static/js/index.3da636dd464baa7582d2.js"> |
| |
| <script id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script> |
| <script src="../../../_static/jquery.js"></script> |
| <script src="../../../_static/underscore.js"></script> |
| <script src="../../../_static/doctools.js"></script> |
| <script src="../../../_static/language_data.js"></script> |
| <script src="../../../_static/copybutton.js"></script> |
| <script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script> |
| <script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| <script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script> |
| <link rel="search" title="Search" href="../../../search.html" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1" /> |
| <meta name="docsearch:language" content="en" /> |
| </head> |
| <body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80"> |
| |
| <nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main"> |
| <div class="container-xl"> |
| |
| <a class="navbar-brand" href="../../../index.html"> |
| |
| <img src="../../../_static/spark-logo-reverse.png" class="logo" alt="logo" /> |
| |
| </a> |
| <button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-menu" aria-controls="navbar-menu" aria-expanded="false" aria-label="Toggle navigation"> |
| <span class="navbar-toggler-icon"></span> |
| </button> |
| |
| <div id="navbar-menu" class="col-lg-9 collapse navbar-collapse"> |
| <ul id="navbar-main-elements" class="navbar-nav mr-auto"> |
| |
| |
| <li class="nav-item "> |
| <a class="nav-link" href="../../../getting_started/index.html">Getting Started</a> |
| </li> |
| |
| <li class="nav-item "> |
| <a class="nav-link" href="../../../user_guide/index.html">User Guide</a> |
| </li> |
| |
| <li class="nav-item "> |
| <a class="nav-link" href="../../../reference/index.html">API Reference</a> |
| </li> |
| |
| <li class="nav-item "> |
| <a class="nav-link" href="../../../development/index.html">Development</a> |
| </li> |
| |
| <li class="nav-item "> |
| <a class="nav-link" href="../../../migration_guide/index.html">Migration Guide</a> |
| </li> |
| |
| |
| </ul> |
| |
| |
| |
| |
| <ul class="navbar-nav"> |
| |
| |
| </ul> |
| </div> |
| </div> |
| </nav> |
| |
| |
| <div class="container-xl"> |
| <div class="row"> |
| |
| <div class="col-12 col-md-3 bd-sidebar"><form class="bd-search d-flex align-items-center" action="../../../search.html" method="get"> |
| <i class="icon fas fa-search"></i> |
| <input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" > |
| </form> |
| <nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation"> |
| |
| <div class="bd-toc-item active"> |
| |
| |
| <ul class="nav bd-sidenav"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </ul> |
| |
| </nav> |
| </div> |
| |
| |
| |
| <div class="d-none d-xl-block col-xl-2 bd-toc"> |
| |
| |
| <nav id="bd-toc-nav"> |
| <ul class="nav section-nav flex-column"> |
| |
| </ul> |
| </nav> |
| |
| |
| |
| </div> |
| |
| |
| |
| <main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main"> |
| |
| <div> |
| |
| <h1>Source code for pyspark.sql.streaming</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="kn">import</span> <span class="nn">sys</span> |
| <span class="kn">import</span> <span class="nn">json</span> |
| <span class="kn">from</span> <span class="nn">collections.abc</span> <span class="kn">import</span> <span class="n">Iterator</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">cast</span><span class="p">,</span> <span class="n">overload</span><span class="p">,</span> <span class="n">Any</span><span class="p">,</span> <span class="n">Callable</span><span class="p">,</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">List</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">TYPE_CHECKING</span><span class="p">,</span> <span class="n">Union</span> |
| |
| <span class="kn">from</span> <span class="nn">py4j.java_gateway</span> <span class="kn">import</span> <span class="n">java_import</span><span class="p">,</span> <span class="n">JavaObject</span> |
| |
| <span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">since</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.column</span> <span class="kn">import</span> <span class="n">_to_seq</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.readwriter</span> <span class="kn">import</span> <span class="n">OptionUtils</span><span class="p">,</span> <span class="n">to_str</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="n">Row</span><span class="p">,</span> <span class="n">StructType</span><span class="p">,</span> <span class="n">StructField</span><span class="p">,</span> <span class="n">StringType</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.utils</span> <span class="kn">import</span> <span class="n">ForeachBatchFunction</span><span class="p">,</span> <span class="n">StreamingQueryException</span> |
| |
| <span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.session</span> <span class="kn">import</span> <span class="n">SparkSession</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql._typing</span> <span class="kn">import</span> <span class="n">SupportsProcess</span><span class="p">,</span> <span class="n">OptionalPrimitiveType</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.dataframe</span> <span class="kn">import</span> <span class="n">DataFrame</span> |
| |
| <span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">"StreamingQuery"</span><span class="p">,</span> <span class="s2">"StreamingQueryManager"</span><span class="p">,</span> <span class="s2">"DataStreamReader"</span><span class="p">,</span> <span class="s2">"DataStreamWriter"</span><span class="p">]</span> |
| |
| |
| <div class="viewcode-block" id="StreamingQuery"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.html#pyspark.sql.streaming.StreamingQuery">[docs]</a><span class="k">class</span> <span class="nc">StreamingQuery</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A handle to a query that is executing continuously in the background as new data arrives.</span> |
| <span class="sd"> All these methods are thread-safe.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">jsq</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span> <span class="o">=</span> <span class="n">jsq</span> |
| |
| <span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span> |
| <span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">id</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="sd">"""Returns the unique id of this query that persists across restarts from checkpoint data.</span> |
| <span class="sd"> That is, this id is generated when a query is started for the first time, and</span> |
| <span class="sd"> will be the same every time it is restarted from checkpoint data.</span> |
| <span class="sd"> There can only be one query with the same id active in a Spark cluster.</span> |
| <span class="sd"> Also see, `runId`.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">id</span><span class="p">()</span><span class="o">.</span><span class="n">toString</span><span class="p">()</span> |
| |
| <span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span> |
| <span class="nd">@since</span><span class="p">(</span><span class="mf">2.1</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">runId</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="sd">"""Returns the unique id of this query that does not persist across restarts. That is, every</span> |
| <span class="sd"> query that is started (or restarted from checkpoint) will have a different runId.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">runId</span><span class="p">()</span><span class="o">.</span><span class="n">toString</span><span class="p">()</span> |
| |
| <span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span> |
| <span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">name</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">str</span><span class="p">:</span> |
| <span class="sd">"""Returns the user-specified name of the query, or null if not specified.</span> |
| <span class="sd"> This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`</span> |
| <span class="sd"> as `dataframe.writeStream.queryName("query").start()`.</span> |
| <span class="sd"> This name, if set, must be unique across all active queries.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">name</span><span class="p">()</span> |
| |
| <span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span> |
| <span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">isActive</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="nb">bool</span><span class="p">:</span> |
| <span class="sd">"""Whether this streaming query is currently active or not."""</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">isActive</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="StreamingQuery.awaitTermination"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.awaitTermination.html#pyspark.sql.streaming.StreamingQuery.awaitTermination">[docs]</a> <span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">awaitTermination</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timeout</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-></span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]:</span> |
| <span class="sd">"""Waits for the termination of `this` query, either by :func:`query.stop()` or by an</span> |
| <span class="sd"> exception. If the query has terminated with an exception, then the exception will be thrown.</span> |
| <span class="sd"> If `timeout` is set, it returns whether the query has terminated or not within the</span> |
| <span class="sd"> `timeout` seconds.</span> |
| |
| <span class="sd"> If the query has terminated, then all subsequent calls to this method will either return</span> |
| <span class="sd"> immediately (if the query was terminated by :func:`stop()`), or throw the exception</span> |
| <span class="sd"> immediately (if the query has terminated with exception).</span> |
| |
| <span class="sd"> throws :class:`StreamingQueryException`, if `this` query has terminated with an exception</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">timeout</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">timeout</span><span class="p">,</span> <span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="nb">float</span><span class="p">))</span> <span class="ow">or</span> <span class="n">timeout</span> <span class="o"><</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"timeout must be a positive integer or float. Got </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">timeout</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">awaitTermination</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">timeout</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">awaitTermination</span><span class="p">()</span></div> |
| |
| <span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span> |
| <span class="nd">@since</span><span class="p">(</span><span class="mf">2.1</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">status</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns the current status of the query.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">status</span><span class="p">()</span><span class="o">.</span><span class="n">json</span><span class="p">())</span> |
| |
| <span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span> |
| <span class="nd">@since</span><span class="p">(</span><span class="mf">2.1</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">recentProgress</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">List</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]:</span> |
| <span class="sd">"""Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.</span> |
| <span class="sd"> The number of progress updates retained for each stream is configured by Spark session</span> |
| <span class="sd"> configuration `spark.sql.streaming.numRecentProgressUpdates`.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">p</span><span class="o">.</span><span class="n">json</span><span class="p">())</span> <span class="k">for</span> <span class="n">p</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">recentProgress</span><span class="p">()]</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">lastProgress</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or</span> |
| <span class="sd"> None if there were no progress updates</span> |
| |
| <span class="sd"> .. versionadded:: 2.1.0</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> dict</span> |
| <span class="sd"> """</span> |
| <span class="n">lastProgress</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">lastProgress</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">lastProgress</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">lastProgress</span><span class="o">.</span><span class="n">json</span><span class="p">())</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| |
| <div class="viewcode-block" id="StreamingQuery.processAllAvailable"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.processAllAvailable.html#pyspark.sql.streaming.StreamingQuery.processAllAvailable">[docs]</a> <span class="k">def</span> <span class="nf">processAllAvailable</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="sd">"""Blocks until all available data in the source has been processed and committed to the</span> |
| <span class="sd"> sink. This method is intended for testing.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> In the case of continually arriving data, this method may block forever.</span> |
| <span class="sd"> Additionally, this method is only guaranteed to block until data that has been</span> |
| <span class="sd"> synchronously appended data to a stream source prior to invocation.</span> |
| <span class="sd"> (i.e. `getOffset` must immediately reflect the addition).</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">processAllAvailable</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="StreamingQuery.stop"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.stop.html#pyspark.sql.streaming.StreamingQuery.stop">[docs]</a> <span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">stop</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="sd">"""Stop this streaming query."""</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="StreamingQuery.explain"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.explain.html#pyspark.sql.streaming.StreamingQuery.explain">[docs]</a> <span class="k">def</span> <span class="nf">explain</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">extended</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="sd">"""Prints the (logical and physical) plans to the console for debugging purpose.</span> |
| |
| <span class="sd"> .. versionadded:: 2.1.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> extended : bool, optional</span> |
| <span class="sd"> default ``False``. If ``False``, prints only the physical plan.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()</span> |
| <span class="sd"> >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.</span> |
| <span class="sd"> >>> sq.explain()</span> |
| <span class="sd"> == Physical Plan ==</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> sq.explain(True)</span> |
| <span class="sd"> == Parsed Logical Plan ==</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> == Analyzed Logical Plan ==</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> == Optimized Logical Plan ==</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> == Physical Plan ==</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> sq.stop()</span> |
| <span class="sd"> """</span> |
| <span class="c1"># Cannot call `_jsq.explain(...)` because it will print in the JVM process.</span> |
| <span class="c1"># We should print it in the Python process.</span> |
| <span class="nb">print</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">explainInternal</span><span class="p">(</span><span class="n">extended</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="StreamingQuery.exception"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.exception.html#pyspark.sql.streaming.StreamingQuery.exception">[docs]</a> <span class="k">def</span> <span class="nf">exception</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">Optional</span><span class="p">[</span><span class="n">StreamingQueryException</span><span class="p">]:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> .. versionadded:: 2.1.0</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`StreamingQueryException`</span> |
| <span class="sd"> the StreamingQueryException if the query was terminated by an exception, or None.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">exception</span><span class="p">()</span><span class="o">.</span><span class="n">isDefined</span><span class="p">():</span> |
| <span class="n">je</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">exception</span><span class="p">()</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> |
| <span class="n">msg</span> <span class="o">=</span> <span class="n">je</span><span class="o">.</span><span class="n">toString</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">": "</span><span class="p">,</span> <span class="mi">1</span><span class="p">)[</span><span class="mi">1</span><span class="p">]</span> <span class="c1"># Drop the Java StreamingQueryException type info</span> |
| <span class="n">stackTrace</span> <span class="o">=</span> <span class="s2">"</span><span class="se">\n\t</span><span class="s2"> at "</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="o">.</span><span class="n">toString</span><span class="p">(),</span> <span class="n">je</span><span class="o">.</span><span class="n">getStackTrace</span><span class="p">()))</span> |
| <span class="k">return</span> <span class="n">StreamingQueryException</span><span class="p">(</span><span class="n">msg</span><span class="p">,</span> <span class="n">stackTrace</span><span class="p">,</span> <span class="n">je</span><span class="o">.</span><span class="n">getCause</span><span class="p">())</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">None</span></div></div> |
| |
| |
| <div class="viewcode-block" id="StreamingQueryManager"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.html#pyspark.sql.streaming.StreamingQueryManager">[docs]</a><span class="k">class</span> <span class="nc">StreamingQueryManager</span><span class="p">:</span> |
| <span class="sd">"""A class to manage all the :class:`StreamingQuery` StreamingQueries active.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">jsqm</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span> <span class="o">=</span> <span class="n">jsqm</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">active</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">List</span><span class="p">[</span><span class="n">StreamingQuery</span><span class="p">]:</span> |
| <span class="sd">"""Returns a list of active queries associated with this SQLContext</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()</span> |
| <span class="sd"> >>> sqm = spark.streams</span> |
| <span class="sd"> >>> # get the list of active streaming queries</span> |
| <span class="sd"> >>> [q.name for q in sqm.active]</span> |
| <span class="sd"> ['this_query']</span> |
| <span class="sd"> >>> sq.stop()</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">StreamingQuery</span><span class="p">(</span><span class="n">jsq</span><span class="p">)</span> <span class="k">for</span> <span class="n">jsq</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">active</span><span class="p">()]</span> |
| |
| <div class="viewcode-block" id="StreamingQueryManager.get"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.get.html#pyspark.sql.streaming.StreamingQueryManager.get">[docs]</a> <span class="k">def</span> <span class="nf">get</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="nb">id</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="n">StreamingQuery</span><span class="p">:</span> |
| <span class="sd">"""Returns an active query from this SQLContext or throws exception if an active query</span> |
| <span class="sd"> with this name doesn't exist.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()</span> |
| <span class="sd"> >>> sq.name</span> |
| <span class="sd"> 'this_query'</span> |
| <span class="sd"> >>> sq = spark.streams.get(sq.id)</span> |
| <span class="sd"> >>> sq.isActive</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> sq = sqlContext.streams.get(sq.id)</span> |
| <span class="sd"> >>> sq.isActive</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> sq.stop()</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">StreamingQuery</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="nb">id</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="StreamingQueryManager.awaitAnyTermination"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.awaitAnyTermination.html#pyspark.sql.streaming.StreamingQueryManager.awaitAnyTermination">[docs]</a> <span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">awaitAnyTermination</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timeout</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-></span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]:</span> |
| <span class="sd">"""Wait until any of the queries on the associated SQLContext has terminated since the</span> |
| <span class="sd"> creation of the context, or since :func:`resetTerminated()` was called. If any query was</span> |
| <span class="sd"> terminated with an exception, then the exception will be thrown.</span> |
| <span class="sd"> If `timeout` is set, it returns whether the query has terminated or not within the</span> |
| <span class="sd"> `timeout` seconds.</span> |
| |
| <span class="sd"> If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will</span> |
| <span class="sd"> either return immediately (if the query was terminated by :func:`query.stop()`),</span> |
| <span class="sd"> or throw the exception immediately (if the query was terminated with exception). Use</span> |
| <span class="sd"> :func:`resetTerminated()` to clear past terminations and wait for new terminations.</span> |
| |
| <span class="sd"> In the case where multiple queries have terminated since :func:`resetTermination()`</span> |
| <span class="sd"> was called, if any query has terminated with exception, then :func:`awaitAnyTermination()`</span> |
| <span class="sd"> will throw any of the exception. For correctly documenting exceptions across multiple</span> |
| <span class="sd"> queries, users need to stop all of them after any of them terminates with exception, and</span> |
| <span class="sd"> then check the `query.exception()` for each query.</span> |
| |
| <span class="sd"> throws :class:`StreamingQueryException`, if `this` query has terminated with an exception</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">timeout</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">timeout</span><span class="p">,</span> <span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="nb">float</span><span class="p">))</span> <span class="ow">or</span> <span class="n">timeout</span> <span class="o"><</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"timeout must be a positive integer or float. Got </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">timeout</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">awaitAnyTermination</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">timeout</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">awaitAnyTermination</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="StreamingQueryManager.resetTerminated"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.resetTerminated.html#pyspark.sql.streaming.StreamingQueryManager.resetTerminated">[docs]</a> <span class="k">def</span> <span class="nf">resetTerminated</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="sd">"""Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used</span> |
| <span class="sd"> again to wait for new terminations.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> spark.streams.resetTerminated()</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">resetTerminated</span><span class="p">()</span></div></div> |
| |
| |
| <div class="viewcode-block" id="DataStreamReader"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader">[docs]</a><span class="k">class</span> <span class="nc">DataStreamReader</span><span class="p">(</span><span class="n">OptionUtils</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Interface used to load a streaming :class:`DataFrame <pyspark.sql.DataFrame>` from external</span> |
| <span class="sd"> storage systems (e.g. file systems, key-value stores, etc).</span> |
| <span class="sd"> Use :attr:`SparkSession.readStream <pyspark.sql.SparkSession.readStream>` to access this.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spark</span><span class="p">:</span> <span class="s2">"SparkSession"</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">_jsparkSession</span><span class="o">.</span><span class="n">readStream</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span> <span class="o">=</span> <span class="n">spark</span> |
| |
| <span class="k">def</span> <span class="nf">_df</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">jdf</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataFrame"</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.dataframe</span> <span class="kn">import</span> <span class="n">DataFrame</span> |
| |
| <span class="k">return</span> <span class="n">DataFrame</span><span class="p">(</span><span class="n">jdf</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="DataStreamReader.format"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.format.html#pyspark.sql.streaming.DataStreamReader.format">[docs]</a> <span class="k">def</span> <span class="nf">format</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamReader"</span><span class="p">:</span> |
| <span class="sd">"""Specifies the input data source format.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> source : str</span> |
| <span class="sd"> name of the data source, e.g. 'json', 'parquet'.</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> s = spark.readStream.format("text")</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">source</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.schema"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.schema.html#pyspark.sql.streaming.DataStreamReader.schema">[docs]</a> <span class="k">def</span> <span class="nf">schema</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">schema</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">StructType</span><span class="p">,</span> <span class="nb">str</span><span class="p">])</span> <span class="o">-></span> <span class="s2">"DataStreamReader"</span><span class="p">:</span> |
| <span class="sd">"""Specifies the input schema.</span> |
| |
| <span class="sd"> Some data sources (e.g. JSON) can infer the input schema automatically from data.</span> |
| <span class="sd"> By specifying the schema here, the underlying data source can skip the schema</span> |
| <span class="sd"> inference step, and thus speed up data loading.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> schema : :class:`pyspark.sql.types.StructType` or str</span> |
| <span class="sd"> a :class:`pyspark.sql.types.StructType` object or a DDL-formatted string</span> |
| <span class="sd"> (For example ``col0 INT, col1 DOUBLE``).</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> s = spark.readStream.schema(sdf_schema)</span> |
| <span class="sd"> >>> s = spark.readStream.schema("col0 INT, col1 DOUBLE")</span> |
| <span class="sd"> """</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span> |
| |
| <span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">_getActiveSessionOrCreate</span><span class="p">()</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">schema</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span> |
| <span class="n">jschema</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">_jsparkSession</span><span class="o">.</span><span class="n">parseDataType</span><span class="p">(</span><span class="n">schema</span><span class="o">.</span><span class="n">json</span><span class="p">())</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">jschema</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">schema</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">schema</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"schema should be StructType or string"</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.option"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.option.html#pyspark.sql.streaming.DataStreamReader.option">[docs]</a> <span class="k">def</span> <span class="nf">option</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="s2">"OptionalPrimitiveType"</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamReader"</span><span class="p">:</span> |
| <span class="sd">"""Adds an input option for the underlying data source.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> s = spark.readStream.option("x", 1)</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">to_str</span><span class="p">(</span><span class="n">value</span><span class="p">))</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.options"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.options.html#pyspark.sql.streaming.DataStreamReader.options">[docs]</a> <span class="k">def</span> <span class="nf">options</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">"OptionalPrimitiveType"</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamReader"</span><span class="p">:</span> |
| <span class="sd">"""Adds input options for the underlying data source.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> s = spark.readStream.options(x="1", y=2)</span> |
| <span class="sd"> """</span> |
| <span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">options</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">to_str</span><span class="p">(</span><span class="n">options</span><span class="p">[</span><span class="n">k</span><span class="p">]))</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.load"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.load.html#pyspark.sql.streaming.DataStreamReader.load">[docs]</a> <span class="k">def</span> <span class="nf">load</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="nb">format</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">schema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="n">StructType</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">"OptionalPrimitiveType"</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"DataFrame"</span><span class="p">:</span> |
| <span class="sd">"""Loads a data stream from a data source and returns it as a</span> |
| <span class="sd"> :class:`DataFrame <pyspark.sql.DataFrame>`.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str, optional</span> |
| <span class="sd"> optional string for file-system backed data sources.</span> |
| <span class="sd"> format : str, optional</span> |
| <span class="sd"> optional string for format of the data source. Default to 'parquet'.</span> |
| <span class="sd"> schema : :class:`pyspark.sql.types.StructType` or str, optional</span> |
| <span class="sd"> optional :class:`pyspark.sql.types.StructType` for the input schema</span> |
| <span class="sd"> or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).</span> |
| <span class="sd"> **options : dict</span> |
| <span class="sd"> all other string options</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> json_sdf = spark.readStream.format("json") \\</span> |
| <span class="sd"> ... .schema(sdf_schema) \\</span> |
| <span class="sd"> ... .load(tempfile.mkdtemp())</span> |
| <span class="sd"> >>> json_sdf.isStreaming</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> json_sdf.schema == sdf_schema</span> |
| <span class="sd"> True</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="nb">format</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">format</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">schema</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">schema</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">(</span><span class="o">**</span><span class="n">options</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">path</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">path</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"If the path is provided for stream, it needs to be a "</span> |
| <span class="o">+</span> <span class="s2">"non-empty string. List of paths are not supported."</span> |
| <span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">path</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">load</span><span class="p">())</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.json"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.json.html#pyspark.sql.streaming.DataStreamReader.json">[docs]</a> <span class="k">def</span> <span class="nf">json</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">schema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="n">StructType</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">primitivesAsString</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">prefersDecimal</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">allowComments</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">allowUnquotedFieldNames</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">allowSingleQuotes</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">allowNumericLeadingZero</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">allowBackslashEscapingAnyCharacter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">mode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">columnNameOfCorruptRecord</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">dateFormat</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">timestampFormat</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">multiLine</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">allowUnquotedControlChars</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">lineSep</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">locale</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">dropFieldIfAllNull</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">encoding</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">allowNonNumericNumbers</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"DataFrame"</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Loads a JSON file stream and returns the results as a :class:`DataFrame`.</span> |
| |
| <span class="sd"> `JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.</span> |
| <span class="sd"> For JSON (one record per file), set the ``multiLine`` parameter to ``true``.</span> |
| |
| <span class="sd"> If the ``schema`` parameter is not specified, this function goes</span> |
| <span class="sd"> through the input once to determine the input schema.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> string represents path to the JSON dataset,</span> |
| <span class="sd"> or RDD of Strings storing JSON objects.</span> |
| <span class="sd"> schema : :class:`pyspark.sql.types.StructType` or str, optional</span> |
| <span class="sd"> an optional :class:`pyspark.sql.types.StructType` for the input schema</span> |
| <span class="sd"> or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).</span> |
| |
| <span class="sd"> Other Parameters</span> |
| <span class="sd"> ----------------</span> |
| <span class="sd"> Extra options</span> |
| <span class="sd"> For the extra options, refer to</span> |
| <span class="sd"> `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option>`_</span> |
| <span class="sd"> in the version you use.</span> |
| |
| <span class="sd"> .. # noqa</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)</span> |
| <span class="sd"> >>> json_sdf.isStreaming</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> json_sdf.schema == sdf_schema</span> |
| <span class="sd"> True</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span> |
| <span class="n">schema</span><span class="o">=</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">primitivesAsString</span><span class="o">=</span><span class="n">primitivesAsString</span><span class="p">,</span> |
| <span class="n">prefersDecimal</span><span class="o">=</span><span class="n">prefersDecimal</span><span class="p">,</span> |
| <span class="n">allowComments</span><span class="o">=</span><span class="n">allowComments</span><span class="p">,</span> |
| <span class="n">allowUnquotedFieldNames</span><span class="o">=</span><span class="n">allowUnquotedFieldNames</span><span class="p">,</span> |
| <span class="n">allowSingleQuotes</span><span class="o">=</span><span class="n">allowSingleQuotes</span><span class="p">,</span> |
| <span class="n">allowNumericLeadingZero</span><span class="o">=</span><span class="n">allowNumericLeadingZero</span><span class="p">,</span> |
| <span class="n">allowBackslashEscapingAnyCharacter</span><span class="o">=</span><span class="n">allowBackslashEscapingAnyCharacter</span><span class="p">,</span> |
| <span class="n">mode</span><span class="o">=</span><span class="n">mode</span><span class="p">,</span> |
| <span class="n">columnNameOfCorruptRecord</span><span class="o">=</span><span class="n">columnNameOfCorruptRecord</span><span class="p">,</span> |
| <span class="n">dateFormat</span><span class="o">=</span><span class="n">dateFormat</span><span class="p">,</span> |
| <span class="n">timestampFormat</span><span class="o">=</span><span class="n">timestampFormat</span><span class="p">,</span> |
| <span class="n">multiLine</span><span class="o">=</span><span class="n">multiLine</span><span class="p">,</span> |
| <span class="n">allowUnquotedControlChars</span><span class="o">=</span><span class="n">allowUnquotedControlChars</span><span class="p">,</span> |
| <span class="n">lineSep</span><span class="o">=</span><span class="n">lineSep</span><span class="p">,</span> |
| <span class="n">locale</span><span class="o">=</span><span class="n">locale</span><span class="p">,</span> |
| <span class="n">dropFieldIfAllNull</span><span class="o">=</span><span class="n">dropFieldIfAllNull</span><span class="p">,</span> |
| <span class="n">encoding</span><span class="o">=</span><span class="n">encoding</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span> |
| <span class="n">allowNonNumericNumbers</span><span class="o">=</span><span class="n">allowNonNumericNumbers</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">json</span><span class="p">(</span><span class="n">path</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"path can be only a single string"</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.orc"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.orc.html#pyspark.sql.streaming.DataStreamReader.orc">[docs]</a> <span class="k">def</span> <span class="nf">orc</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">mergeSchema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"DataFrame"</span><span class="p">:</span> |
| <span class="sd">"""Loads a ORC file stream, returning the result as a :class:`DataFrame`.</span> |
| |
| <span class="sd"> .. versionadded:: 2.3.0</span> |
| |
| <span class="sd"> Other Parameters</span> |
| <span class="sd"> ----------------</span> |
| <span class="sd"> Extra options</span> |
| <span class="sd"> For the extra options, refer to</span> |
| <span class="sd"> `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option>`_</span> |
| <span class="sd"> in the version you use.</span> |
| |
| <span class="sd"> .. # noqa</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())</span> |
| <span class="sd"> >>> orc_sdf.isStreaming</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> orc_sdf.schema == sdf_schema</span> |
| <span class="sd"> True</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span> |
| <span class="n">mergeSchema</span><span class="o">=</span><span class="n">mergeSchema</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">orc</span><span class="p">(</span><span class="n">path</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"path can be only a single string"</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.parquet"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.parquet.html#pyspark.sql.streaming.DataStreamReader.parquet">[docs]</a> <span class="k">def</span> <span class="nf">parquet</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">mergeSchema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">datetimeRebaseMode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">int96RebaseMode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"DataFrame"</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Loads a Parquet file stream, returning the result as a :class:`DataFrame`.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str</span> |
| <span class="sd"> the path in any Hadoop supported file system</span> |
| |
| <span class="sd"> Other Parameters</span> |
| <span class="sd"> ----------------</span> |
| <span class="sd"> Extra options</span> |
| <span class="sd"> For the extra options, refer to</span> |
| <span class="sd"> `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option>`_.</span> |
| <span class="sd"> in the version you use.</span> |
| |
| <span class="sd"> .. # noqa</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())</span> |
| <span class="sd"> >>> parquet_sdf.isStreaming</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> parquet_sdf.schema == sdf_schema</span> |
| <span class="sd"> True</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span> |
| <span class="n">mergeSchema</span><span class="o">=</span><span class="n">mergeSchema</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span> |
| <span class="n">datetimeRebaseMode</span><span class="o">=</span><span class="n">datetimeRebaseMode</span><span class="p">,</span> |
| <span class="n">int96RebaseMode</span><span class="o">=</span><span class="n">int96RebaseMode</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="n">path</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"path can be only a single string"</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.text"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.text.html#pyspark.sql.streaming.DataStreamReader.text">[docs]</a> <span class="k">def</span> <span class="nf">text</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">wholetext</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span> |
| <span class="n">lineSep</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"DataFrame"</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a</span> |
| <span class="sd"> string column named "value", and followed by partitioned columns if there</span> |
| <span class="sd"> are any.</span> |
| <span class="sd"> The text files must be encoded as UTF-8.</span> |
| |
| <span class="sd"> By default, each line in the text file is a new row in the resulting DataFrame.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str or list</span> |
| <span class="sd"> string, or list of strings, for input path(s).</span> |
| |
| <span class="sd"> Other Parameters</span> |
| <span class="sd"> ----------------</span> |
| <span class="sd"> Extra options</span> |
| <span class="sd"> For the extra options, refer to</span> |
| <span class="sd"> `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option>`_</span> |
| <span class="sd"> in the version you use.</span> |
| |
| <span class="sd"> .. # noqa</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> text_sdf = spark.readStream.text(tempfile.mkdtemp())</span> |
| <span class="sd"> >>> text_sdf.isStreaming</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> "value" in str(text_sdf.schema)</span> |
| <span class="sd"> True</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span> |
| <span class="n">wholetext</span><span class="o">=</span><span class="n">wholetext</span><span class="p">,</span> |
| <span class="n">lineSep</span><span class="o">=</span><span class="n">lineSep</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">text</span><span class="p">(</span><span class="n">path</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"path can be only a single string"</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.csv"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.csv.html#pyspark.sql.streaming.DataStreamReader.csv">[docs]</a> <span class="k">def</span> <span class="nf">csv</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="n">schema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="n">StructType</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">sep</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">encoding</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">quote</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">escape</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">comment</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">header</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">inferSchema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">ignoreLeadingWhiteSpace</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">ignoreTrailingWhiteSpace</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">nullValue</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">nanValue</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">positiveInf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">negativeInf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">dateFormat</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">timestampFormat</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">maxColumns</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">maxCharsPerColumn</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">maxMalformedLogPerPartition</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">mode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">columnNameOfCorruptRecord</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">multiLine</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">charToEscapeQuoteEscaping</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">enforceSchema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">emptyValue</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">locale</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">lineSep</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">unescapedQuoteHandling</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"DataFrame"</span><span class="p">:</span> |
| <span class="sa">r</span><span class="sd">"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.</span> |
| |
| <span class="sd"> This function will go through the input once to determine the input schema if</span> |
| <span class="sd"> ``inferSchema`` is enabled. To avoid going through the entire data once, disable</span> |
| <span class="sd"> ``inferSchema`` option or specify the schema explicitly using ``schema``.</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str or list</span> |
| <span class="sd"> string, or list of strings, for input path(s).</span> |
| <span class="sd"> schema : :class:`pyspark.sql.types.StructType` or str, optional</span> |
| <span class="sd"> an optional :class:`pyspark.sql.types.StructType` for the input schema</span> |
| <span class="sd"> or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Other Parameters</span> |
| <span class="sd"> ----------------</span> |
| <span class="sd"> Extra options</span> |
| <span class="sd"> For the extra options, refer to</span> |
| <span class="sd"> `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option>`_</span> |
| <span class="sd"> in the version you use.</span> |
| |
| <span class="sd"> .. # noqa</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)</span> |
| <span class="sd"> >>> csv_sdf.isStreaming</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> csv_sdf.schema == sdf_schema</span> |
| <span class="sd"> True</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span> |
| <span class="n">schema</span><span class="o">=</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">sep</span><span class="o">=</span><span class="n">sep</span><span class="p">,</span> |
| <span class="n">encoding</span><span class="o">=</span><span class="n">encoding</span><span class="p">,</span> |
| <span class="n">quote</span><span class="o">=</span><span class="n">quote</span><span class="p">,</span> |
| <span class="n">escape</span><span class="o">=</span><span class="n">escape</span><span class="p">,</span> |
| <span class="n">comment</span><span class="o">=</span><span class="n">comment</span><span class="p">,</span> |
| <span class="n">header</span><span class="o">=</span><span class="n">header</span><span class="p">,</span> |
| <span class="n">inferSchema</span><span class="o">=</span><span class="n">inferSchema</span><span class="p">,</span> |
| <span class="n">ignoreLeadingWhiteSpace</span><span class="o">=</span><span class="n">ignoreLeadingWhiteSpace</span><span class="p">,</span> |
| <span class="n">ignoreTrailingWhiteSpace</span><span class="o">=</span><span class="n">ignoreTrailingWhiteSpace</span><span class="p">,</span> |
| <span class="n">nullValue</span><span class="o">=</span><span class="n">nullValue</span><span class="p">,</span> |
| <span class="n">nanValue</span><span class="o">=</span><span class="n">nanValue</span><span class="p">,</span> |
| <span class="n">positiveInf</span><span class="o">=</span><span class="n">positiveInf</span><span class="p">,</span> |
| <span class="n">negativeInf</span><span class="o">=</span><span class="n">negativeInf</span><span class="p">,</span> |
| <span class="n">dateFormat</span><span class="o">=</span><span class="n">dateFormat</span><span class="p">,</span> |
| <span class="n">timestampFormat</span><span class="o">=</span><span class="n">timestampFormat</span><span class="p">,</span> |
| <span class="n">maxColumns</span><span class="o">=</span><span class="n">maxColumns</span><span class="p">,</span> |
| <span class="n">maxCharsPerColumn</span><span class="o">=</span><span class="n">maxCharsPerColumn</span><span class="p">,</span> |
| <span class="n">maxMalformedLogPerPartition</span><span class="o">=</span><span class="n">maxMalformedLogPerPartition</span><span class="p">,</span> |
| <span class="n">mode</span><span class="o">=</span><span class="n">mode</span><span class="p">,</span> |
| <span class="n">columnNameOfCorruptRecord</span><span class="o">=</span><span class="n">columnNameOfCorruptRecord</span><span class="p">,</span> |
| <span class="n">multiLine</span><span class="o">=</span><span class="n">multiLine</span><span class="p">,</span> |
| <span class="n">charToEscapeQuoteEscaping</span><span class="o">=</span><span class="n">charToEscapeQuoteEscaping</span><span class="p">,</span> |
| <span class="n">enforceSchema</span><span class="o">=</span><span class="n">enforceSchema</span><span class="p">,</span> |
| <span class="n">emptyValue</span><span class="o">=</span><span class="n">emptyValue</span><span class="p">,</span> |
| <span class="n">locale</span><span class="o">=</span><span class="n">locale</span><span class="p">,</span> |
| <span class="n">lineSep</span><span class="o">=</span><span class="n">lineSep</span><span class="p">,</span> |
| <span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span> |
| <span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span> |
| <span class="n">unescapedQuoteHandling</span><span class="o">=</span><span class="n">unescapedQuoteHandling</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">csv</span><span class="p">(</span><span class="n">path</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"path can be only a single string"</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="DataStreamReader.table"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.table.html#pyspark.sql.streaming.DataStreamReader.table">[docs]</a> <span class="k">def</span> <span class="nf">table</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tableName</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataFrame"</span><span class="p">:</span> |
| <span class="sd">"""Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should</span> |
| <span class="sd"> support streaming mode.</span> |
| |
| <span class="sd"> .. versionadded:: 3.1.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> tableName : str</span> |
| <span class="sd"> string, for the name of the table.</span> |
| |
| <span class="sd"> Returns</span> |
| <span class="sd"> -------</span> |
| <span class="sd"> :class:`DataFrame`</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> spark.readStream.table('input_table') # doctest: +SKIP</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">tableName</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">table</span><span class="p">(</span><span class="n">tableName</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"tableName can be only a single string"</span><span class="p">)</span></div></div> |
| |
| |
| <div class="viewcode-block" id="DataStreamWriter"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter">[docs]</a><span class="k">class</span> <span class="nc">DataStreamWriter</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Interface used to write a streaming :class:`DataFrame <pyspark.sql.DataFrame>` to external</span> |
| <span class="sd"> storage systems (e.g. file systems, key-value stores, etc).</span> |
| <span class="sd"> Use :attr:`DataFrame.writeStream <pyspark.sql.DataFrame.writeStream>`</span> |
| <span class="sd"> to access this.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">df</span><span class="p">:</span> <span class="s2">"DataFrame"</span><span class="p">)</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_df</span> <span class="o">=</span> <span class="n">df</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">sparkSession</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">_jdf</span><span class="o">.</span><span class="n">writeStream</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">_sq</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">jsq</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">)</span> <span class="o">-></span> <span class="n">StreamingQuery</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.streaming</span> <span class="kn">import</span> <span class="n">StreamingQuery</span> |
| |
| <span class="k">return</span> <span class="n">StreamingQuery</span><span class="p">(</span><span class="n">jsq</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="DataStreamWriter.outputMode"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.outputMode.html#pyspark.sql.streaming.DataStreamWriter.outputMode">[docs]</a> <span class="k">def</span> <span class="nf">outputMode</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">outputMode</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="sd">"""Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Options include:</span> |
| |
| <span class="sd"> * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to</span> |
| <span class="sd"> the sink</span> |
| <span class="sd"> * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the sink</span> |
| <span class="sd"> every time these are some updates</span> |
| <span class="sd"> * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be</span> |
| <span class="sd"> written to the sink every time there are some updates. If the query doesn't contain</span> |
| <span class="sd"> aggregations, it will be equivalent to `append` mode.</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> writer = sdf.writeStream.outputMode('append')</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">outputMode</span> <span class="ow">or</span> <span class="nb">type</span><span class="p">(</span><span class="n">outputMode</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">outputMode</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"The output mode must be a non-empty string. Got: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">outputMode</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">outputMode</span><span class="p">(</span><span class="n">outputMode</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamWriter.format"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.format.html#pyspark.sql.streaming.DataStreamWriter.format">[docs]</a> <span class="k">def</span> <span class="nf">format</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="sd">"""Specifies the underlying output data source.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> source : str</span> |
| <span class="sd"> string, name of the data source, which for now can be 'parquet'.</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> writer = sdf.writeStream.format('json')</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">source</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamWriter.option"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.option.html#pyspark.sql.streaming.DataStreamWriter.option">[docs]</a> <span class="k">def</span> <span class="nf">option</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="s2">"OptionalPrimitiveType"</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="sd">"""Adds an output option for the underlying data source.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">to_str</span><span class="p">(</span><span class="n">value</span><span class="p">))</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamWriter.options"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.options.html#pyspark.sql.streaming.DataStreamWriter.options">[docs]</a> <span class="k">def</span> <span class="nf">options</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">"OptionalPrimitiveType"</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="sd">"""Adds output options for the underlying data source.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| <span class="sd"> """</span> |
| <span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">options</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">to_str</span><span class="p">(</span><span class="n">options</span><span class="p">[</span><span class="n">k</span><span class="p">]))</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <span class="nd">@overload</span> |
| <span class="k">def</span> <span class="nf">partitionBy</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">cols</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="o">...</span> |
| |
| <span class="nd">@overload</span> |
| <span class="k">def</span> <span class="nf">partitionBy</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">__cols</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">])</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="o">...</span> |
| |
| <div class="viewcode-block" id="DataStreamWriter.partitionBy"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.partitionBy.html#pyspark.sql.streaming.DataStreamWriter.partitionBy">[docs]</a> <span class="k">def</span> <span class="nf">partitionBy</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">cols</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> <span class="c1"># type: ignore[misc]</span> |
| <span class="sd">"""Partitions the output by the given columns on the file system.</span> |
| |
| <span class="sd"> If specified, the output is laid out on the file system similar</span> |
| <span class="sd"> to Hive's partitioning scheme.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> cols : str or list</span> |
| <span class="sd"> name of columns</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">cols</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">cols</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="p">(</span><span class="nb">list</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">)):</span> |
| <span class="n">cols</span> <span class="o">=</span> <span class="n">cols</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="n">_to_seq</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="n">cols</span><span class="p">))</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamWriter.queryName"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.queryName.html#pyspark.sql.streaming.DataStreamWriter.queryName">[docs]</a> <span class="k">def</span> <span class="nf">queryName</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">queryName</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="sd">"""Specifies the name of the :class:`StreamingQuery` that can be started with</span> |
| <span class="sd"> :func:`start`. This name must be unique among all the currently active queries</span> |
| <span class="sd"> in the associated SparkSession.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> queryName : str</span> |
| <span class="sd"> unique name for the query</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> writer = sdf.writeStream.queryName('streaming_query')</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">queryName</span> <span class="ow">or</span> <span class="nb">type</span><span class="p">(</span><span class="n">queryName</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">queryName</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"The queryName must be a non-empty string. Got: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">queryName</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">queryName</span><span class="p">(</span><span class="n">queryName</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <span class="nd">@overload</span> |
| <span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">processingTime</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="o">...</span> |
| |
| <span class="nd">@overload</span> |
| <span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">once</span><span class="p">:</span> <span class="nb">bool</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="o">...</span> |
| |
| <span class="nd">@overload</span> |
| <span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">continuous</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="o">...</span> |
| |
| <span class="nd">@overload</span> |
| <span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">availableNow</span><span class="p">:</span> <span class="nb">bool</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="o">...</span> |
| |
| <div class="viewcode-block" id="DataStreamWriter.trigger"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.trigger.html#pyspark.sql.streaming.DataStreamWriter.trigger">[docs]</a> <span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="o">*</span><span class="p">,</span> |
| <span class="n">processingTime</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">once</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">continuous</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">availableNow</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="sd">"""Set the trigger for the stream query. If this is not set it will run the query as fast</span> |
| <span class="sd"> as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> processingTime : str, optional</span> |
| <span class="sd"> a processing time interval as a string, e.g. '5 seconds', '1 minute'.</span> |
| <span class="sd"> Set a trigger that runs a microbatch query periodically based on the</span> |
| <span class="sd"> processing time. Only one trigger can be set.</span> |
| <span class="sd"> once : bool, optional</span> |
| <span class="sd"> if set to True, set a trigger that processes only one batch of data in a</span> |
| <span class="sd"> streaming query then terminates the query. Only one trigger can be set.</span> |
| <span class="sd"> continuous : str, optional</span> |
| <span class="sd"> a time interval as a string, e.g. '5 seconds', '1 minute'.</span> |
| <span class="sd"> Set a trigger that runs a continuous query with a given checkpoint</span> |
| <span class="sd"> interval. Only one trigger can be set.</span> |
| <span class="sd"> availableNow : bool, optional</span> |
| <span class="sd"> if set to True, set a trigger that processes all available data in multiple</span> |
| <span class="sd"> batches then terminates the query. Only one trigger can be set.</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> # trigger the query for execution every 5 seconds</span> |
| <span class="sd"> >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')</span> |
| <span class="sd"> >>> # trigger the query for just once batch of data</span> |
| <span class="sd"> >>> writer = sdf.writeStream.trigger(once=True)</span> |
| <span class="sd"> >>> # trigger the query for execution every 5 seconds</span> |
| <span class="sd"> >>> writer = sdf.writeStream.trigger(continuous='5 seconds')</span> |
| <span class="sd"> >>> # trigger the query for reading all available data with multiple batches</span> |
| <span class="sd"> >>> writer = sdf.writeStream.trigger(availableNow=True)</span> |
| <span class="sd"> """</span> |
| <span class="n">params</span> <span class="o">=</span> <span class="p">[</span><span class="n">processingTime</span><span class="p">,</span> <span class="n">once</span><span class="p">,</span> <span class="n">continuous</span><span class="p">,</span> <span class="n">availableNow</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="n">params</span><span class="o">.</span><span class="n">count</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span> <span class="o">==</span> <span class="mi">4</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"No trigger provided"</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">params</span><span class="o">.</span><span class="n">count</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span> <span class="o"><</span> <span class="mi">3</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Multiple triggers not allowed."</span><span class="p">)</span> |
| |
| <span class="n">jTrigger</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="n">processingTime</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="n">processingTime</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">processingTime</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"Value for processingTime must be a non empty string. Got: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">processingTime</span> |
| <span class="p">)</span> |
| <span class="n">interval</span> <span class="o">=</span> <span class="n">processingTime</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span> |
| <span class="n">jTrigger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">Trigger</span><span class="o">.</span><span class="n">ProcessingTime</span><span class="p">(</span> |
| <span class="n">interval</span> |
| <span class="p">)</span> |
| |
| <span class="k">elif</span> <span class="n">once</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">once</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Value for once must be True. Got: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">once</span><span class="p">)</span> |
| <span class="n">jTrigger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">Trigger</span><span class="o">.</span><span class="n">Once</span><span class="p">()</span> |
| |
| <span class="k">elif</span> <span class="n">continuous</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="n">continuous</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">continuous</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s2">"Value for continuous must be a non empty string. Got: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">continuous</span> |
| <span class="p">)</span> |
| <span class="n">interval</span> <span class="o">=</span> <span class="n">continuous</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span> |
| <span class="n">jTrigger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">Trigger</span><span class="o">.</span><span class="n">Continuous</span><span class="p">(</span> |
| <span class="n">interval</span> |
| <span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">availableNow</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Value for availableNow must be True. Got: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">availableNow</span><span class="p">)</span> |
| <span class="n">jTrigger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">Trigger</span><span class="o">.</span><span class="n">AvailableNow</span><span class="p">()</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">trigger</span><span class="p">(</span><span class="n">jTrigger</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <span class="nd">@overload</span> |
| <span class="k">def</span> <span class="nf">foreach</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">Row</span><span class="p">],</span> <span class="kc">None</span><span class="p">])</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="o">...</span> |
| |
| <span class="nd">@overload</span> |
| <span class="k">def</span> <span class="nf">foreach</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="s2">"SupportsProcess"</span><span class="p">)</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="o">...</span> |
| |
| <div class="viewcode-block" id="DataStreamWriter.foreach"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.foreach.html#pyspark.sql.streaming.DataStreamWriter.foreach">[docs]</a> <span class="k">def</span> <span class="nf">foreach</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">Callable</span><span class="p">[[</span><span class="n">Row</span><span class="p">],</span> <span class="kc">None</span><span class="p">],</span> <span class="s2">"SupportsProcess"</span><span class="p">])</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Sets the output of the streaming query to be processed using the provided writer ``f``.</span> |
| <span class="sd"> This is often used to write the output of a streaming query to arbitrary storage systems.</span> |
| <span class="sd"> The processing logic can be specified in two ways.</span> |
| |
| <span class="sd"> #. A **function** that takes a row as input.</span> |
| <span class="sd"> This is a simple way to express your processing logic. Note that this does</span> |
| <span class="sd"> not allow you to deduplicate generated data when failures cause reprocessing of</span> |
| <span class="sd"> some input data. That would require you to specify the processing logic in the next</span> |
| <span class="sd"> way.</span> |
| |
| <span class="sd"> #. An **object** with a ``process`` method and optional ``open`` and ``close`` methods.</span> |
| <span class="sd"> The object can have the following methods.</span> |
| |
| <span class="sd"> * ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing</span> |
| <span class="sd"> (for example, open a connection, start a transaction, etc). Additionally, you can</span> |
| <span class="sd"> use the `partition_id` and `epoch_id` to deduplicate regenerated data</span> |
| <span class="sd"> (discussed later).</span> |
| |
| <span class="sd"> * ``process(row)``: *Non-optional* method that processes each :class:`Row`.</span> |
| |
| <span class="sd"> * ``close(error)``: *Optional* method that finalizes and cleans up (for example,</span> |
| <span class="sd"> close connection, commit transaction, etc.) after all rows have been processed.</span> |
| |
| <span class="sd"> The object will be used by Spark in the following way.</span> |
| |
| <span class="sd"> * A single copy of this object is responsible of all the data generated by a</span> |
| <span class="sd"> single task in a query. In other words, one instance is responsible for</span> |
| <span class="sd"> processing one partition of the data generated in a distributed manner.</span> |
| |
| <span class="sd"> * This object must be serializable because each task will get a fresh</span> |
| <span class="sd"> serialized-deserialized copy of the provided object. Hence, it is strongly</span> |
| <span class="sd"> recommended that any initialization for writing data (e.g. opening a</span> |
| <span class="sd"> connection or starting a transaction) is done after the `open(...)`</span> |
| <span class="sd"> method has been called, which signifies that the task is ready to generate data.</span> |
| |
| <span class="sd"> * The lifecycle of the methods are as follows.</span> |
| |
| <span class="sd"> For each partition with ``partition_id``:</span> |
| |
| <span class="sd"> ... For each batch/epoch of streaming data with ``epoch_id``:</span> |
| |
| <span class="sd"> ....... Method ``open(partitionId, epochId)`` is called.</span> |
| |
| <span class="sd"> ....... If ``open(...)`` returns true, for each row in the partition and</span> |
| <span class="sd"> batch/epoch, method ``process(row)`` is called.</span> |
| |
| <span class="sd"> ....... Method ``close(errorOrNull)`` is called with error (if any) seen while</span> |
| <span class="sd"> processing rows.</span> |
| |
| <span class="sd"> Important points to note:</span> |
| |
| <span class="sd"> * The `partitionId` and `epochId` can be used to deduplicate generated data when</span> |
| <span class="sd"> failures cause reprocessing of some input data. This depends on the execution</span> |
| <span class="sd"> mode of the query. If the streaming query is being executed in the micro-batch</span> |
| <span class="sd"> mode, then every partition represented by a unique tuple (partition_id, epoch_id)</span> |
| <span class="sd"> is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used</span> |
| <span class="sd"> to deduplicate and/or transactionally commit data and achieve exactly-once</span> |
| <span class="sd"> guarantees. However, if the streaming query is being executed in the continuous</span> |
| <span class="sd"> mode, then this guarantee does not hold and therefore should not be used for</span> |
| <span class="sd"> deduplication.</span> |
| |
| <span class="sd"> * The ``close()`` method (if exists) will be called if `open()` method exists and</span> |
| <span class="sd"> returns successfully (irrespective of the return value), except if the Python</span> |
| <span class="sd"> crashes in the middle.</span> |
| |
| <span class="sd"> .. versionadded:: 2.4.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> # Print every row using a function</span> |
| <span class="sd"> >>> def print_row(row):</span> |
| <span class="sd"> ... print(row)</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> writer = sdf.writeStream.foreach(print_row)</span> |
| <span class="sd"> >>> # Print every row using a object with process() method</span> |
| <span class="sd"> >>> class RowPrinter:</span> |
| <span class="sd"> ... def open(self, partition_id, epoch_id):</span> |
| <span class="sd"> ... print("Opened %d, %d" % (partition_id, epoch_id))</span> |
| <span class="sd"> ... return True</span> |
| <span class="sd"> ... def process(self, row):</span> |
| <span class="sd"> ... print(row)</span> |
| <span class="sd"> ... def close(self, error):</span> |
| <span class="sd"> ... print("Closed with error: %s" % str(error))</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> writer = sdf.writeStream.foreach(RowPrinter())</span> |
| <span class="sd"> """</span> |
| |
| <span class="kn">from</span> <span class="nn">pyspark.rdd</span> <span class="kn">import</span> <span class="n">_wrap_function</span> |
| <span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="n">CPickleSerializer</span><span class="p">,</span> <span class="n">AutoBatchedSerializer</span> |
| <span class="kn">from</span> <span class="nn">pyspark.taskcontext</span> <span class="kn">import</span> <span class="n">TaskContext</span> |
| |
| <span class="k">if</span> <span class="n">callable</span><span class="p">(</span><span class="n">f</span><span class="p">):</span> |
| <span class="c1"># The provided object is a callable function that is supposed to be called on each row.</span> |
| <span class="c1"># Construct a function that takes an iterator and calls the provided function on each</span> |
| <span class="c1"># row.</span> |
| <span class="k">def</span> <span class="nf">func_without_process</span><span class="p">(</span><span class="n">_</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">)</span> <span class="o">-></span> <span class="n">Iterator</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> |
| <span class="n">f</span><span class="p">(</span><span class="n">x</span><span class="p">)</span> <span class="c1"># type: ignore[operator]</span> |
| <span class="k">return</span> <span class="nb">iter</span><span class="p">([])</span> |
| |
| <span class="n">func</span> <span class="o">=</span> <span class="n">func_without_process</span> |
| |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># The provided object is not a callable function. Then it is expected to have a</span> |
| <span class="c1"># 'process(row)' method, and optional 'open(partition_id, epoch_id)' and</span> |
| <span class="c1"># 'close(error)' methods.</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="s2">"process"</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span><span class="s2">"Provided object does not have a 'process' method"</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="s2">"process"</span><span class="p">)):</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">"Attribute 'process' in provided object is not callable"</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">doesMethodExist</span><span class="p">(</span><span class="n">method_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="nb">bool</span><span class="p">:</span> |
| <span class="n">exists</span> <span class="o">=</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="n">method_name</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">exists</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="n">method_name</span><span class="p">)):</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span> |
| <span class="s2">"Attribute '</span><span class="si">%s</span><span class="s2">' in provided object is not callable"</span> <span class="o">%</span> <span class="n">method_name</span> |
| <span class="p">)</span> |
| <span class="k">return</span> <span class="n">exists</span> |
| |
| <span class="n">open_exists</span> <span class="o">=</span> <span class="n">doesMethodExist</span><span class="p">(</span><span class="s2">"open"</span><span class="p">)</span> |
| <span class="n">close_exists</span> <span class="o">=</span> <span class="n">doesMethodExist</span><span class="p">(</span><span class="s2">"close"</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">func_with_open_process_close</span><span class="p">(</span><span class="n">partition_id</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">)</span> <span class="o">-></span> <span class="n">Iterator</span><span class="p">:</span> |
| <span class="n">epoch_id</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="n">TaskContext</span><span class="p">,</span> <span class="n">TaskContext</span><span class="o">.</span><span class="n">get</span><span class="p">())</span><span class="o">.</span><span class="n">getLocalProperty</span><span class="p">(</span> |
| <span class="s2">"streaming.sql.batchId"</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="n">epoch_id</span><span class="p">:</span> |
| <span class="n">int_epoch_id</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">epoch_id</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="s2">"Could not get batch id from TaskContext"</span><span class="p">)</span> |
| |
| <span class="c1"># Check if the data should be processed</span> |
| <span class="n">should_process</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="k">if</span> <span class="n">open_exists</span><span class="p">:</span> |
| <span class="n">should_process</span> <span class="o">=</span> <span class="n">f</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">partition_id</span><span class="p">,</span> <span class="n">int_epoch_id</span><span class="p">)</span> <span class="c1"># type: ignore[union-attr]</span> |
| |
| <span class="n">error</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">should_process</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span> |
| <span class="n">cast</span><span class="p">(</span><span class="s2">"SupportsProcess"</span><span class="p">,</span> <span class="n">f</span><span class="p">)</span><span class="o">.</span><span class="n">process</span><span class="p">(</span><span class="n">x</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">ex</span><span class="p">:</span> |
| <span class="n">error</span> <span class="o">=</span> <span class="n">ex</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">close_exists</span><span class="p">:</span> |
| <span class="n">f</span><span class="o">.</span><span class="n">close</span><span class="p">(</span><span class="n">error</span><span class="p">)</span> <span class="c1"># type: ignore[union-attr]</span> |
| <span class="k">if</span> <span class="n">error</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">error</span> |
| |
| <span class="k">return</span> <span class="nb">iter</span><span class="p">([])</span> |
| |
| <span class="n">func</span> <span class="o">=</span> <span class="n">func_with_open_process_close</span> <span class="c1"># type: ignore[assignment]</span> |
| |
| <span class="n">serializer</span> <span class="o">=</span> <span class="n">AutoBatchedSerializer</span><span class="p">(</span><span class="n">CPickleSerializer</span><span class="p">())</span> |
| <span class="n">wrapped_func</span> <span class="o">=</span> <span class="n">_wrap_function</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="n">func</span><span class="p">,</span> <span class="n">serializer</span><span class="p">,</span> <span class="n">serializer</span><span class="p">)</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">jForeachWriter</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">execution</span><span class="o">.</span><span class="n">python</span><span class="o">.</span><span class="n">PythonForeachWriter</span><span class="p">(</span> |
| <span class="n">wrapped_func</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="o">.</span><span class="n">_jdf</span><span class="o">.</span><span class="n">schema</span><span class="p">()</span> |
| <span class="p">)</span> |
| <span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">foreach</span><span class="p">(</span><span class="n">jForeachWriter</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamWriter.foreachBatch"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.foreachBatch.html#pyspark.sql.streaming.DataStreamWriter.foreachBatch">[docs]</a> <span class="k">def</span> <span class="nf">foreachBatch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="s2">"DataFrame"</span><span class="p">,</span> <span class="nb">int</span><span class="p">],</span> <span class="kc">None</span><span class="p">])</span> <span class="o">-></span> <span class="s2">"DataStreamWriter"</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Sets the output of the streaming query to be processed using the provided</span> |
| <span class="sd"> function. This is supported only the in the micro-batch execution modes (that is, when the</span> |
| <span class="sd"> trigger is not continuous). In every micro-batch, the provided function will be called in</span> |
| <span class="sd"> every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier.</span> |
| <span class="sd"> The batchId can be used deduplicate and transactionally write the output</span> |
| <span class="sd"> (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed</span> |
| <span class="sd"> to exactly same for the same batchId (assuming all operations are deterministic in the</span> |
| <span class="sd"> query).</span> |
| |
| <span class="sd"> .. versionadded:: 2.4.0</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> def func(batch_df, batch_id):</span> |
| <span class="sd"> ... batch_df.collect()</span> |
| <span class="sd"> ...</span> |
| <span class="sd"> >>> writer = sdf.writeStream.foreachBatch(func)</span> |
| <span class="sd"> """</span> |
| |
| <span class="kn">from</span> <span class="nn">pyspark.java_gateway</span> <span class="kn">import</span> <span class="n">ensure_callback_server_started</span> |
| |
| <span class="n">gw</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_gateway</span> |
| <span class="k">assert</span> <span class="n">gw</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> |
| <span class="n">java_import</span><span class="p">(</span><span class="n">gw</span><span class="o">.</span><span class="n">jvm</span><span class="p">,</span> <span class="s2">"org.apache.spark.sql.execution.streaming.sources.*"</span><span class="p">)</span> |
| |
| <span class="n">wrapped_func</span> <span class="o">=</span> <span class="n">ForeachBatchFunction</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="p">,</span> <span class="n">func</span><span class="p">)</span> |
| <span class="n">gw</span><span class="o">.</span><span class="n">jvm</span><span class="o">.</span><span class="n">PythonForeachBatchHelper</span><span class="o">.</span><span class="n">callForeachBatch</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="p">,</span> <span class="n">wrapped_func</span><span class="p">)</span> |
| <span class="n">ensure_callback_server_started</span><span class="p">(</span><span class="n">gw</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="DataStreamWriter.start"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.start.html#pyspark.sql.streaming.DataStreamWriter.start">[docs]</a> <span class="k">def</span> <span class="nf">start</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="nb">format</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">outputMode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">partitionBy</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">queryName</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">"OptionalPrimitiveType"</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">StreamingQuery</span><span class="p">:</span> |
| <span class="sd">"""Streams the contents of the :class:`DataFrame` to a data source.</span> |
| |
| <span class="sd"> The data source is specified by the ``format`` and a set of ``options``.</span> |
| <span class="sd"> If ``format`` is not specified, the default data source configured by</span> |
| <span class="sd"> ``spark.sql.sources.default`` will be used.</span> |
| |
| <span class="sd"> .. versionadded:: 2.0.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> path : str, optional</span> |
| <span class="sd"> the path in a Hadoop supported file system</span> |
| <span class="sd"> format : str, optional</span> |
| <span class="sd"> the format used to save</span> |
| <span class="sd"> outputMode : str, optional</span> |
| <span class="sd"> specifies how data of a streaming DataFrame/Dataset is written to a</span> |
| <span class="sd"> streaming sink.</span> |
| |
| <span class="sd"> * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to the</span> |
| <span class="sd"> sink</span> |
| <span class="sd"> * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the</span> |
| <span class="sd"> sink every time these are some updates</span> |
| <span class="sd"> * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be</span> |
| <span class="sd"> written to the sink every time there are some updates. If the query doesn't contain</span> |
| <span class="sd"> aggregations, it will be equivalent to `append` mode.</span> |
| <span class="sd"> partitionBy : str or list, optional</span> |
| <span class="sd"> names of partitioning columns</span> |
| <span class="sd"> queryName : str, optional</span> |
| <span class="sd"> unique name for the query</span> |
| <span class="sd"> **options : dict</span> |
| <span class="sd"> All other string options. You may want to provide a `checkpointLocation`</span> |
| <span class="sd"> for most streams, however it is not required for a `memory` stream.</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()</span> |
| <span class="sd"> >>> sq.isActive</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> sq.name</span> |
| <span class="sd"> 'this_query'</span> |
| <span class="sd"> >>> sq.stop()</span> |
| <span class="sd"> >>> sq.isActive</span> |
| <span class="sd"> False</span> |
| <span class="sd"> >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(</span> |
| <span class="sd"> ... queryName='that_query', outputMode="append", format='memory')</span> |
| <span class="sd"> >>> sq.name</span> |
| <span class="sd"> 'that_query'</span> |
| <span class="sd"> >>> sq.isActive</span> |
| <span class="sd"> True</span> |
| <span class="sd"> >>> sq.stop()</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">(</span><span class="o">**</span><span class="n">options</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">outputMode</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">outputMode</span><span class="p">(</span><span class="n">outputMode</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">partitionBy</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="n">partitionBy</span><span class="p">)</span> |
| <span class="k">if</span> <span class="nb">format</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">format</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">queryName</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">queryName</span><span class="p">(</span><span class="n">queryName</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">path</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sq</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">start</span><span class="p">())</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sq</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">start</span><span class="p">(</span><span class="n">path</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="DataStreamWriter.toTable"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.toTable.html#pyspark.sql.streaming.DataStreamWriter.toTable">[docs]</a> <span class="k">def</span> <span class="nf">toTable</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">tableName</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> |
| <span class="nb">format</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">outputMode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">partitionBy</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">queryName</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">"OptionalPrimitiveType"</span><span class="p">,</span> |
| <span class="p">)</span> <span class="o">-></span> <span class="n">StreamingQuery</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Starts the execution of the streaming query, which will continually output results to the</span> |
| <span class="sd"> given table as new data arrives.</span> |
| |
| <span class="sd"> The returned :class:`StreamingQuery` object can be used to interact with the stream.</span> |
| |
| <span class="sd"> .. versionadded:: 3.1.0</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> tableName : str</span> |
| <span class="sd"> string, for the name of the table.</span> |
| <span class="sd"> format : str, optional</span> |
| <span class="sd"> the format used to save.</span> |
| <span class="sd"> outputMode : str, optional</span> |
| <span class="sd"> specifies how data of a streaming DataFrame/Dataset is written to a</span> |
| <span class="sd"> streaming sink.</span> |
| |
| <span class="sd"> * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to the</span> |
| <span class="sd"> sink</span> |
| <span class="sd"> * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the</span> |
| <span class="sd"> sink every time these are some updates</span> |
| <span class="sd"> * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be</span> |
| <span class="sd"> written to the sink every time there are some updates. If the query doesn't contain</span> |
| <span class="sd"> aggregations, it will be equivalent to `append` mode.</span> |
| <span class="sd"> partitionBy : str or list, optional</span> |
| <span class="sd"> names of partitioning columns</span> |
| <span class="sd"> queryName : str, optional</span> |
| <span class="sd"> unique name for the query</span> |
| <span class="sd"> **options : dict</span> |
| <span class="sd"> All other string options. You may want to provide a `checkpointLocation`.</span> |
| |
| <span class="sd"> Notes</span> |
| <span class="sd"> -----</span> |
| <span class="sd"> This API is evolving.</span> |
| |
| <span class="sd"> For v1 table, partitioning columns provided by `partitionBy` will be respected no matter</span> |
| <span class="sd"> the table exists or not. A new table will be created if the table not exists.</span> |
| |
| <span class="sd"> For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will</span> |
| <span class="sd"> be respected only if the v2 table does not exist. Besides, the v2 table created by this API</span> |
| <span class="sd"> lacks some functionalities (e.g., customized properties, options, and serde info). If you</span> |
| <span class="sd"> need them, please create the v2 table manually before the execution to avoid creating a</span> |
| <span class="sd"> table with incomplete information.</span> |
| |
| <span class="sd"> Examples</span> |
| <span class="sd"> --------</span> |
| <span class="sd"> >>> sdf.writeStream.format('parquet').queryName('query').toTable('output_table')</span> |
| <span class="sd"> ... # doctest: +SKIP</span> |
| |
| <span class="sd"> >>> sdf.writeStream.trigger(processingTime='5 seconds').toTable(</span> |
| <span class="sd"> ... 'output_table',</span> |
| <span class="sd"> ... queryName='that_query',</span> |
| <span class="sd"> ... outputMode="append",</span> |
| <span class="sd"> ... format='parquet',</span> |
| <span class="sd"> ... checkpointLocation='/tmp/checkpoint') # doctest: +SKIP</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">(</span><span class="o">**</span><span class="n">options</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">outputMode</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">outputMode</span><span class="p">(</span><span class="n">outputMode</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">partitionBy</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="n">partitionBy</span><span class="p">)</span> |
| <span class="k">if</span> <span class="nb">format</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">format</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">queryName</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">queryName</span><span class="p">(</span><span class="n">queryName</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sq</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">toTable</span><span class="p">(</span><span class="n">tableName</span><span class="p">))</span></div></div> |
| |
| |
| <span class="k">def</span> <span class="nf">_test</span><span class="p">()</span> <span class="o">-></span> <span class="kc">None</span><span class="p">:</span> |
| <span class="kn">import</span> <span class="nn">doctest</span> |
| <span class="kn">import</span> <span class="nn">os</span> |
| <span class="kn">import</span> <span class="nn">tempfile</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span><span class="p">,</span> <span class="n">SQLContext</span> |
| <span class="kn">import</span> <span class="nn">pyspark.sql.streaming</span> |
| <span class="kn">from</span> <span class="nn">py4j.protocol</span> <span class="kn">import</span> <span class="n">Py4JError</span> |
| |
| <span class="n">os</span><span class="o">.</span><span class="n">chdir</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="p">[</span><span class="s2">"SPARK_HOME"</span><span class="p">])</span> |
| |
| <span class="n">globs</span> <span class="o">=</span> <span class="n">pyspark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">_getActiveSessionOrCreate</span><span class="p">()</span> |
| <span class="k">except</span> <span class="n">Py4JError</span><span class="p">:</span> <span class="c1"># noqa: F821</span> |
| <span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> <span class="c1"># type: ignore[name-defined] # noqa: F821</span> |
| |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"tempfile"</span><span class="p">]</span> <span class="o">=</span> <span class="n">tempfile</span> |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"os"</span><span class="p">]</span> <span class="o">=</span> <span class="n">os</span> |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"spark"</span><span class="p">]</span> <span class="o">=</span> <span class="n">spark</span> |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"sqlContext"</span><span class="p">]</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">(</span><span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span><span class="p">)</span> |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"sdf"</span><span class="p">]</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">readStream</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"text"</span><span class="p">)</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="s2">"python/test_support/sql/streaming"</span><span class="p">)</span> |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"sdf_schema"</span><span class="p">]</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">([</span><span class="n">StructField</span><span class="p">(</span><span class="s2">"data"</span><span class="p">,</span> <span class="n">StringType</span><span class="p">(),</span> <span class="kc">True</span><span class="p">)])</span> |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"df"</span><span class="p">]</span> <span class="o">=</span> <span class="n">globs</span><span class="p">[</span><span class="s2">"spark"</span><span class="p">]</span><span class="o">.</span><span class="n">readStream</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"text"</span><span class="p">)</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="s2">"python/test_support/sql/streaming"</span><span class="p">)</span> |
| |
| <span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span> |
| <span class="n">pyspark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="p">,</span> |
| <span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span> |
| <span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span> <span class="o">|</span> <span class="n">doctest</span><span class="o">.</span><span class="n">NORMALIZE_WHITESPACE</span> <span class="o">|</span> <span class="n">doctest</span><span class="o">.</span><span class="n">REPORT_NDIFF</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="n">globs</span><span class="p">[</span><span class="s2">"spark"</span><span class="p">]</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span> |
| |
| <span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span> |
| <span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span> |
| |
| |
| <span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">"__main__"</span><span class="p">:</span> |
| <span class="n">_test</span><span class="p">()</span> |
| </pre></div> |
| |
| </div> |
| |
| |
| <div class='prev-next-bottom'> |
| |
| |
| </div> |
| |
| </main> |
| |
| |
| </div> |
| </div> |
| |
| |
| <script src="../../../_static/js/index.3da636dd464baa7582d2.js"></script> |
| |
| |
| <footer class="footer mt-5 mt-md-0"> |
| <div class="container"> |
| <p> |
| © Copyright .<br/> |
| Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br/> |
| </p> |
| </div> |
| </footer> |
| </body> |
| </html> |