blob: 28fbed332df9394bd85558fff62e73d2f87f7898 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>pyspark.sql.streaming &#8212; PySpark 3.3.2 documentation</title>
<link rel="stylesheet" href="../../../_static/css/index.73d71520a4ca3b99cfee5594769eaaae.css">
<link rel="stylesheet"
href="../../../_static/vendor/fontawesome/5.13.0/css/all.min.css">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
<link rel="stylesheet"
href="../../../_static/vendor/open-sans_all/1.44.1/index.css">
<link rel="stylesheet"
href="../../../_static/vendor/lato_latin-ext/1.44.1/index.css">
<link rel="stylesheet" href="../../../_static/basic.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/pyspark.css" />
<link rel="preload" as="script" href="../../../_static/js/index.3da636dd464baa7582d2.js">
<script id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script src="../../../_static/jquery.js"></script>
<script src="../../../_static/underscore.js"></script>
<script src="../../../_static/doctools.js"></script>
<script src="../../../_static/language_data.js"></script>
<script src="../../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script>
<link rel="search" title="Search" href="../../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="en" />
</head>
<body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
<nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main">
<div class="container-xl">
<a class="navbar-brand" href="../../../index.html">
<img src="../../../_static/spark-logo-reverse.png" class="logo" alt="logo" />
</a>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-menu" aria-controls="navbar-menu" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div id="navbar-menu" class="col-lg-9 collapse navbar-collapse">
<ul id="navbar-main-elements" class="navbar-nav mr-auto">
<li class="nav-item ">
<a class="nav-link" href="../../../getting_started/index.html">Getting Started</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../../user_guide/index.html">User Guide</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../../reference/index.html">API Reference</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../../development/index.html">Development</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../../migration_guide/index.html">Migration Guide</a>
</li>
</ul>
<ul class="navbar-nav">
</ul>
</div>
</div>
</nav>
<div class="container-xl">
<div class="row">
<div class="col-12 col-md-3 bd-sidebar"><form class="bd-search d-flex align-items-center" action="../../../search.html" method="get">
<i class="icon fas fa-search"></i>
<input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" >
</form>
<nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
<div class="bd-toc-item active">
<ul class="nav bd-sidenav">
</ul>
</nav>
</div>
<div class="d-none d-xl-block col-xl-2 bd-toc">
<nav id="bd-toc-nav">
<ul class="nav section-nav flex-column">
</ul>
</nav>
</div>
<main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main">
<div>
<h1>Source code for pyspark.sql.streaming</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">import</span> <span class="nn">json</span>
<span class="kn">from</span> <span class="nn">collections.abc</span> <span class="kn">import</span> <span class="n">Iterator</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">cast</span><span class="p">,</span> <span class="n">overload</span><span class="p">,</span> <span class="n">Any</span><span class="p">,</span> <span class="n">Callable</span><span class="p">,</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">List</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">TYPE_CHECKING</span><span class="p">,</span> <span class="n">Union</span>
<span class="kn">from</span> <span class="nn">py4j.java_gateway</span> <span class="kn">import</span> <span class="n">java_import</span><span class="p">,</span> <span class="n">JavaObject</span>
<span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">since</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.column</span> <span class="kn">import</span> <span class="n">_to_seq</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.readwriter</span> <span class="kn">import</span> <span class="n">OptionUtils</span><span class="p">,</span> <span class="n">to_str</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="n">Row</span><span class="p">,</span> <span class="n">StructType</span><span class="p">,</span> <span class="n">StructField</span><span class="p">,</span> <span class="n">StringType</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.utils</span> <span class="kn">import</span> <span class="n">ForeachBatchFunction</span><span class="p">,</span> <span class="n">StreamingQueryException</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.session</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="kn">from</span> <span class="nn">pyspark.sql._typing</span> <span class="kn">import</span> <span class="n">SupportsProcess</span><span class="p">,</span> <span class="n">OptionalPrimitiveType</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.dataframe</span> <span class="kn">import</span> <span class="n">DataFrame</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;StreamingQuery&quot;</span><span class="p">,</span> <span class="s2">&quot;StreamingQueryManager&quot;</span><span class="p">,</span> <span class="s2">&quot;DataStreamReader&quot;</span><span class="p">,</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">]</span>
<div class="viewcode-block" id="StreamingQuery"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.html#pyspark.sql.streaming.StreamingQuery">[docs]</a><span class="k">class</span> <span class="nc">StreamingQuery</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A handle to a query that is executing continuously in the background as new data arrives.</span>
<span class="sd"> All these methods are thread-safe.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">jsq</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span> <span class="o">=</span> <span class="n">jsq</span>
<span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span>
<span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">id</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns the unique id of this query that persists across restarts from checkpoint data.</span>
<span class="sd"> That is, this id is generated when a query is started for the first time, and</span>
<span class="sd"> will be the same every time it is restarted from checkpoint data.</span>
<span class="sd"> There can only be one query with the same id active in a Spark cluster.</span>
<span class="sd"> Also see, `runId`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">id</span><span class="p">()</span><span class="o">.</span><span class="n">toString</span><span class="p">()</span>
<span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span>
<span class="nd">@since</span><span class="p">(</span><span class="mf">2.1</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">runId</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns the unique id of this query that does not persist across restarts. That is, every</span>
<span class="sd"> query that is started (or restarted from checkpoint) will have a different runId.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">runId</span><span class="p">()</span><span class="o">.</span><span class="n">toString</span><span class="p">()</span>
<span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span>
<span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">name</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns the user-specified name of the query, or null if not specified.</span>
<span class="sd"> This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`</span>
<span class="sd"> as `dataframe.writeStream.queryName(&quot;query&quot;).start()`.</span>
<span class="sd"> This name, if set, must be unique across all active queries.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">name</span><span class="p">()</span>
<span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span>
<span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">isActive</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">bool</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Whether this streaming query is currently active or not.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">isActive</span><span class="p">()</span>
<div class="viewcode-block" id="StreamingQuery.awaitTermination"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.awaitTermination.html#pyspark.sql.streaming.StreamingQuery.awaitTermination">[docs]</a> <span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">awaitTermination</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timeout</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Waits for the termination of `this` query, either by :func:`query.stop()` or by an</span>
<span class="sd"> exception. If the query has terminated with an exception, then the exception will be thrown.</span>
<span class="sd"> If `timeout` is set, it returns whether the query has terminated or not within the</span>
<span class="sd"> `timeout` seconds.</span>
<span class="sd"> If the query has terminated, then all subsequent calls to this method will either return</span>
<span class="sd"> immediately (if the query was terminated by :func:`stop()`), or throw the exception</span>
<span class="sd"> immediately (if the query has terminated with exception).</span>
<span class="sd"> throws :class:`StreamingQueryException`, if `this` query has terminated with an exception</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">timeout</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">timeout</span><span class="p">,</span> <span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="nb">float</span><span class="p">))</span> <span class="ow">or</span> <span class="n">timeout</span> <span class="o">&lt;</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;timeout must be a positive integer or float. Got </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">timeout</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">awaitTermination</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">timeout</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">awaitTermination</span><span class="p">()</span></div>
<span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span>
<span class="nd">@since</span><span class="p">(</span><span class="mf">2.1</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">status</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the current status of the query.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">status</span><span class="p">()</span><span class="o">.</span><span class="n">json</span><span class="p">())</span>
<span class="nd">@property</span> <span class="c1"># type: ignore[misc]</span>
<span class="nd">@since</span><span class="p">(</span><span class="mf">2.1</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">recentProgress</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.</span>
<span class="sd"> The number of progress updates retained for each stream is configured by Spark session</span>
<span class="sd"> configuration `spark.sql.streaming.numRecentProgressUpdates`.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="p">[</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">p</span><span class="o">.</span><span class="n">json</span><span class="p">())</span> <span class="k">for</span> <span class="n">p</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">recentProgress</span><span class="p">()]</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">lastProgress</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or</span>
<span class="sd"> None if there were no progress updates</span>
<span class="sd"> .. versionadded:: 2.1.0</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> dict</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">lastProgress</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">lastProgress</span><span class="p">()</span>
<span class="k">if</span> <span class="n">lastProgress</span><span class="p">:</span>
<span class="k">return</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">lastProgress</span><span class="o">.</span><span class="n">json</span><span class="p">())</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span>
<div class="viewcode-block" id="StreamingQuery.processAllAvailable"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.processAllAvailable.html#pyspark.sql.streaming.StreamingQuery.processAllAvailable">[docs]</a> <span class="k">def</span> <span class="nf">processAllAvailable</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Blocks until all available data in the source has been processed and committed to the</span>
<span class="sd"> sink. This method is intended for testing.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> In the case of continually arriving data, this method may block forever.</span>
<span class="sd"> Additionally, this method is only guaranteed to block until data that has been</span>
<span class="sd"> synchronously appended data to a stream source prior to invocation.</span>
<span class="sd"> (i.e. `getOffset` must immediately reflect the addition).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">processAllAvailable</span><span class="p">()</span></div>
<div class="viewcode-block" id="StreamingQuery.stop"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.stop.html#pyspark.sql.streaming.StreamingQuery.stop">[docs]</a> <span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">stop</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Stop this streaming query.&quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span></div>
<div class="viewcode-block" id="StreamingQuery.explain"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.explain.html#pyspark.sql.streaming.StreamingQuery.explain">[docs]</a> <span class="k">def</span> <span class="nf">explain</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">extended</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Prints the (logical and physical) plans to the console for debugging purpose.</span>
<span class="sd"> .. versionadded:: 2.1.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> extended : bool, optional</span>
<span class="sd"> default ``False``. If ``False``, prints only the physical plan.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; sq = sdf.writeStream.format(&#39;memory&#39;).queryName(&#39;query_explain&#39;).start()</span>
<span class="sd"> &gt;&gt;&gt; sq.processAllAvailable() # Wait a bit to generate the runtime plans.</span>
<span class="sd"> &gt;&gt;&gt; sq.explain()</span>
<span class="sd"> == Physical Plan ==</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; sq.explain(True)</span>
<span class="sd"> == Parsed Logical Plan ==</span>
<span class="sd"> ...</span>
<span class="sd"> == Analyzed Logical Plan ==</span>
<span class="sd"> ...</span>
<span class="sd"> == Optimized Logical Plan ==</span>
<span class="sd"> ...</span>
<span class="sd"> == Physical Plan ==</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; sq.stop()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Cannot call `_jsq.explain(...)` because it will print in the JVM process.</span>
<span class="c1"># We should print it in the Python process.</span>
<span class="nb">print</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">explainInternal</span><span class="p">(</span><span class="n">extended</span><span class="p">))</span></div>
<div class="viewcode-block" id="StreamingQuery.exception"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.exception.html#pyspark.sql.streaming.StreamingQuery.exception">[docs]</a> <span class="k">def</span> <span class="nf">exception</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="n">StreamingQueryException</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> .. versionadded:: 2.1.0</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> :class:`StreamingQueryException`</span>
<span class="sd"> the StreamingQueryException if the query was terminated by an exception, or None.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">exception</span><span class="p">()</span><span class="o">.</span><span class="n">isDefined</span><span class="p">():</span>
<span class="n">je</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsq</span><span class="o">.</span><span class="n">exception</span><span class="p">()</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="n">msg</span> <span class="o">=</span> <span class="n">je</span><span class="o">.</span><span class="n">toString</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">&quot;: &quot;</span><span class="p">,</span> <span class="mi">1</span><span class="p">)[</span><span class="mi">1</span><span class="p">]</span> <span class="c1"># Drop the Java StreamingQueryException type info</span>
<span class="n">stackTrace</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\n\t</span><span class="s2"> at &quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="o">.</span><span class="n">toString</span><span class="p">(),</span> <span class="n">je</span><span class="o">.</span><span class="n">getStackTrace</span><span class="p">()))</span>
<span class="k">return</span> <span class="n">StreamingQueryException</span><span class="p">(</span><span class="n">msg</span><span class="p">,</span> <span class="n">stackTrace</span><span class="p">,</span> <span class="n">je</span><span class="o">.</span><span class="n">getCause</span><span class="p">())</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span></div></div>
<div class="viewcode-block" id="StreamingQueryManager"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.html#pyspark.sql.streaming.StreamingQueryManager">[docs]</a><span class="k">class</span> <span class="nc">StreamingQueryManager</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A class to manage all the :class:`StreamingQuery` StreamingQueries active.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">jsqm</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span> <span class="o">=</span> <span class="n">jsqm</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">active</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="n">StreamingQuery</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a list of active queries associated with this SQLContext</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; sq = sdf.writeStream.format(&#39;memory&#39;).queryName(&#39;this_query&#39;).start()</span>
<span class="sd"> &gt;&gt;&gt; sqm = spark.streams</span>
<span class="sd"> &gt;&gt;&gt; # get the list of active streaming queries</span>
<span class="sd"> &gt;&gt;&gt; [q.name for q in sqm.active]</span>
<span class="sd"> [&#39;this_query&#39;]</span>
<span class="sd"> &gt;&gt;&gt; sq.stop()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="p">[</span><span class="n">StreamingQuery</span><span class="p">(</span><span class="n">jsq</span><span class="p">)</span> <span class="k">for</span> <span class="n">jsq</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">active</span><span class="p">()]</span>
<div class="viewcode-block" id="StreamingQueryManager.get"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.get.html#pyspark.sql.streaming.StreamingQueryManager.get">[docs]</a> <span class="k">def</span> <span class="nf">get</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="nb">id</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">StreamingQuery</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns an active query from this SQLContext or throws exception if an active query</span>
<span class="sd"> with this name doesn&#39;t exist.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; sq = sdf.writeStream.format(&#39;memory&#39;).queryName(&#39;this_query&#39;).start()</span>
<span class="sd"> &gt;&gt;&gt; sq.name</span>
<span class="sd"> &#39;this_query&#39;</span>
<span class="sd"> &gt;&gt;&gt; sq = spark.streams.get(sq.id)</span>
<span class="sd"> &gt;&gt;&gt; sq.isActive</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; sq = sqlContext.streams.get(sq.id)</span>
<span class="sd"> &gt;&gt;&gt; sq.isActive</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; sq.stop()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">StreamingQuery</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="nb">id</span><span class="p">))</span></div>
<div class="viewcode-block" id="StreamingQueryManager.awaitAnyTermination"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.awaitAnyTermination.html#pyspark.sql.streaming.StreamingQueryManager.awaitAnyTermination">[docs]</a> <span class="nd">@since</span><span class="p">(</span><span class="mf">2.0</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">awaitAnyTermination</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timeout</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Wait until any of the queries on the associated SQLContext has terminated since the</span>
<span class="sd"> creation of the context, or since :func:`resetTerminated()` was called. If any query was</span>
<span class="sd"> terminated with an exception, then the exception will be thrown.</span>
<span class="sd"> If `timeout` is set, it returns whether the query has terminated or not within the</span>
<span class="sd"> `timeout` seconds.</span>
<span class="sd"> If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will</span>
<span class="sd"> either return immediately (if the query was terminated by :func:`query.stop()`),</span>
<span class="sd"> or throw the exception immediately (if the query was terminated with exception). Use</span>
<span class="sd"> :func:`resetTerminated()` to clear past terminations and wait for new terminations.</span>
<span class="sd"> In the case where multiple queries have terminated since :func:`resetTermination()`</span>
<span class="sd"> was called, if any query has terminated with exception, then :func:`awaitAnyTermination()`</span>
<span class="sd"> will throw any of the exception. For correctly documenting exceptions across multiple</span>
<span class="sd"> queries, users need to stop all of them after any of them terminates with exception, and</span>
<span class="sd"> then check the `query.exception()` for each query.</span>
<span class="sd"> throws :class:`StreamingQueryException`, if `this` query has terminated with an exception</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">timeout</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">timeout</span><span class="p">,</span> <span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="nb">float</span><span class="p">))</span> <span class="ow">or</span> <span class="n">timeout</span> <span class="o">&lt;</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;timeout must be a positive integer or float. Got </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">timeout</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">awaitAnyTermination</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span><span class="n">timeout</span> <span class="o">*</span> <span class="mi">1000</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">awaitAnyTermination</span><span class="p">()</span></div>
<div class="viewcode-block" id="StreamingQueryManager.resetTerminated"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.resetTerminated.html#pyspark.sql.streaming.StreamingQueryManager.resetTerminated">[docs]</a> <span class="k">def</span> <span class="nf">resetTerminated</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used</span>
<span class="sd"> again to wait for new terminations.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; spark.streams.resetTerminated()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jsqm</span><span class="o">.</span><span class="n">resetTerminated</span><span class="p">()</span></div></div>
<div class="viewcode-block" id="DataStreamReader"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader">[docs]</a><span class="k">class</span> <span class="nc">DataStreamReader</span><span class="p">(</span><span class="n">OptionUtils</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Interface used to load a streaming :class:`DataFrame &lt;pyspark.sql.DataFrame&gt;` from external</span>
<span class="sd"> storage systems (e.g. file systems, key-value stores, etc).</span>
<span class="sd"> Use :attr:`SparkSession.readStream &lt;pyspark.sql.SparkSession.readStream&gt;` to access this.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spark</span><span class="p">:</span> <span class="s2">&quot;SparkSession&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">_jsparkSession</span><span class="o">.</span><span class="n">readStream</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spark</span> <span class="o">=</span> <span class="n">spark</span>
<span class="k">def</span> <span class="nf">_df</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">jdf</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataFrame&quot;</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.dataframe</span> <span class="kn">import</span> <span class="n">DataFrame</span>
<span class="k">return</span> <span class="n">DataFrame</span><span class="p">(</span><span class="n">jdf</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="p">)</span>
<div class="viewcode-block" id="DataStreamReader.format"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.format.html#pyspark.sql.streaming.DataStreamReader.format">[docs]</a> <span class="k">def</span> <span class="nf">format</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamReader&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Specifies the input data source format.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> source : str</span>
<span class="sd"> name of the data source, e.g. &#39;json&#39;, &#39;parquet&#39;.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; s = spark.readStream.format(&quot;text&quot;)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">source</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamReader.schema"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.schema.html#pyspark.sql.streaming.DataStreamReader.schema">[docs]</a> <span class="k">def</span> <span class="nf">schema</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">schema</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">StructType</span><span class="p">,</span> <span class="nb">str</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamReader&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Specifies the input schema.</span>
<span class="sd"> Some data sources (e.g. JSON) can infer the input schema automatically from data.</span>
<span class="sd"> By specifying the schema here, the underlying data source can skip the schema</span>
<span class="sd"> inference step, and thus speed up data loading.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> schema : :class:`pyspark.sql.types.StructType` or str</span>
<span class="sd"> a :class:`pyspark.sql.types.StructType` object or a DDL-formatted string</span>
<span class="sd"> (For example ``col0 INT, col1 DOUBLE``).</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; s = spark.readStream.schema(sdf_schema)</span>
<span class="sd"> &gt;&gt;&gt; s = spark.readStream.schema(&quot;col0 INT, col1 DOUBLE&quot;)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">_getActiveSessionOrCreate</span><span class="p">()</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">schema</span><span class="p">,</span> <span class="n">StructType</span><span class="p">):</span>
<span class="n">jschema</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">_jsparkSession</span><span class="o">.</span><span class="n">parseDataType</span><span class="p">(</span><span class="n">schema</span><span class="o">.</span><span class="n">json</span><span class="p">())</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">jschema</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">schema</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">schema</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;schema should be StructType or string&quot;</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamReader.option"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.option.html#pyspark.sql.streaming.DataStreamReader.option">[docs]</a> <span class="k">def</span> <span class="nf">option</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="s2">&quot;OptionalPrimitiveType&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamReader&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Adds an input option for the underlying data source.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; s = spark.readStream.option(&quot;x&quot;, 1)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">to_str</span><span class="p">(</span><span class="n">value</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamReader.options"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.options.html#pyspark.sql.streaming.DataStreamReader.options">[docs]</a> <span class="k">def</span> <span class="nf">options</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">&quot;OptionalPrimitiveType&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamReader&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Adds input options for the underlying data source.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; s = spark.readStream.options(x=&quot;1&quot;, y=2)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">options</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">to_str</span><span class="p">(</span><span class="n">options</span><span class="p">[</span><span class="n">k</span><span class="p">]))</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamReader.load"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.load.html#pyspark.sql.streaming.DataStreamReader.load">[docs]</a> <span class="k">def</span> <span class="nf">load</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">path</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="nb">format</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">schema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="n">StructType</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">&quot;OptionalPrimitiveType&quot;</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataFrame&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Loads a data stream from a data source and returns it as a</span>
<span class="sd"> :class:`DataFrame &lt;pyspark.sql.DataFrame&gt;`.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> path : str, optional</span>
<span class="sd"> optional string for file-system backed data sources.</span>
<span class="sd"> format : str, optional</span>
<span class="sd"> optional string for format of the data source. Default to &#39;parquet&#39;.</span>
<span class="sd"> schema : :class:`pyspark.sql.types.StructType` or str, optional</span>
<span class="sd"> optional :class:`pyspark.sql.types.StructType` for the input schema</span>
<span class="sd"> or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).</span>
<span class="sd"> **options : dict</span>
<span class="sd"> all other string options</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; json_sdf = spark.readStream.format(&quot;json&quot;) \\</span>
<span class="sd"> ... .schema(sdf_schema) \\</span>
<span class="sd"> ... .load(tempfile.mkdtemp())</span>
<span class="sd"> &gt;&gt;&gt; json_sdf.isStreaming</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; json_sdf.schema == sdf_schema</span>
<span class="sd"> True</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">format</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">format</span><span class="p">)</span>
<span class="k">if</span> <span class="n">schema</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">(</span><span class="n">schema</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">(</span><span class="o">**</span><span class="n">options</span><span class="p">)</span>
<span class="k">if</span> <span class="n">path</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">path</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;If the path is provided for stream, it needs to be a &quot;</span>
<span class="o">+</span> <span class="s2">&quot;non-empty string. List of paths are not supported.&quot;</span>
<span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">path</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">load</span><span class="p">())</span></div>
<div class="viewcode-block" id="DataStreamReader.json"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.json.html#pyspark.sql.streaming.DataStreamReader.json">[docs]</a> <span class="k">def</span> <span class="nf">json</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">schema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="n">StructType</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">primitivesAsString</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">prefersDecimal</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">allowComments</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">allowUnquotedFieldNames</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">allowSingleQuotes</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">allowNumericLeadingZero</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">allowBackslashEscapingAnyCharacter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">mode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">columnNameOfCorruptRecord</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">dateFormat</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">timestampFormat</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">multiLine</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">allowUnquotedControlChars</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">lineSep</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">locale</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">dropFieldIfAllNull</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">encoding</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">allowNonNumericNumbers</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataFrame&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Loads a JSON file stream and returns the results as a :class:`DataFrame`.</span>
<span class="sd"> `JSON Lines &lt;http://jsonlines.org/&gt;`_ (newline-delimited JSON) is supported by default.</span>
<span class="sd"> For JSON (one record per file), set the ``multiLine`` parameter to ``true``.</span>
<span class="sd"> If the ``schema`` parameter is not specified, this function goes</span>
<span class="sd"> through the input once to determine the input schema.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> path : str</span>
<span class="sd"> string represents path to the JSON dataset,</span>
<span class="sd"> or RDD of Strings storing JSON objects.</span>
<span class="sd"> schema : :class:`pyspark.sql.types.StructType` or str, optional</span>
<span class="sd"> an optional :class:`pyspark.sql.types.StructType` for the input schema</span>
<span class="sd"> or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).</span>
<span class="sd"> Other Parameters</span>
<span class="sd"> ----------------</span>
<span class="sd"> Extra options</span>
<span class="sd"> For the extra options, refer to</span>
<span class="sd"> `Data Source Option &lt;https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option&gt;`_</span>
<span class="sd"> in the version you use.</span>
<span class="sd"> .. # noqa</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)</span>
<span class="sd"> &gt;&gt;&gt; json_sdf.isStreaming</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; json_sdf.schema == sdf_schema</span>
<span class="sd"> True</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span>
<span class="n">schema</span><span class="o">=</span><span class="n">schema</span><span class="p">,</span>
<span class="n">primitivesAsString</span><span class="o">=</span><span class="n">primitivesAsString</span><span class="p">,</span>
<span class="n">prefersDecimal</span><span class="o">=</span><span class="n">prefersDecimal</span><span class="p">,</span>
<span class="n">allowComments</span><span class="o">=</span><span class="n">allowComments</span><span class="p">,</span>
<span class="n">allowUnquotedFieldNames</span><span class="o">=</span><span class="n">allowUnquotedFieldNames</span><span class="p">,</span>
<span class="n">allowSingleQuotes</span><span class="o">=</span><span class="n">allowSingleQuotes</span><span class="p">,</span>
<span class="n">allowNumericLeadingZero</span><span class="o">=</span><span class="n">allowNumericLeadingZero</span><span class="p">,</span>
<span class="n">allowBackslashEscapingAnyCharacter</span><span class="o">=</span><span class="n">allowBackslashEscapingAnyCharacter</span><span class="p">,</span>
<span class="n">mode</span><span class="o">=</span><span class="n">mode</span><span class="p">,</span>
<span class="n">columnNameOfCorruptRecord</span><span class="o">=</span><span class="n">columnNameOfCorruptRecord</span><span class="p">,</span>
<span class="n">dateFormat</span><span class="o">=</span><span class="n">dateFormat</span><span class="p">,</span>
<span class="n">timestampFormat</span><span class="o">=</span><span class="n">timestampFormat</span><span class="p">,</span>
<span class="n">multiLine</span><span class="o">=</span><span class="n">multiLine</span><span class="p">,</span>
<span class="n">allowUnquotedControlChars</span><span class="o">=</span><span class="n">allowUnquotedControlChars</span><span class="p">,</span>
<span class="n">lineSep</span><span class="o">=</span><span class="n">lineSep</span><span class="p">,</span>
<span class="n">locale</span><span class="o">=</span><span class="n">locale</span><span class="p">,</span>
<span class="n">dropFieldIfAllNull</span><span class="o">=</span><span class="n">dropFieldIfAllNull</span><span class="p">,</span>
<span class="n">encoding</span><span class="o">=</span><span class="n">encoding</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span>
<span class="n">allowNonNumericNumbers</span><span class="o">=</span><span class="n">allowNonNumericNumbers</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">json</span><span class="p">(</span><span class="n">path</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;path can be only a single string&quot;</span><span class="p">)</span></div>
<div class="viewcode-block" id="DataStreamReader.orc"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.orc.html#pyspark.sql.streaming.DataStreamReader.orc">[docs]</a> <span class="k">def</span> <span class="nf">orc</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">mergeSchema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataFrame&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Loads a ORC file stream, returning the result as a :class:`DataFrame`.</span>
<span class="sd"> .. versionadded:: 2.3.0</span>
<span class="sd"> Other Parameters</span>
<span class="sd"> ----------------</span>
<span class="sd"> Extra options</span>
<span class="sd"> For the extra options, refer to</span>
<span class="sd"> `Data Source Option &lt;https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option&gt;`_</span>
<span class="sd"> in the version you use.</span>
<span class="sd"> .. # noqa</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())</span>
<span class="sd"> &gt;&gt;&gt; orc_sdf.isStreaming</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; orc_sdf.schema == sdf_schema</span>
<span class="sd"> True</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span>
<span class="n">mergeSchema</span><span class="o">=</span><span class="n">mergeSchema</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">orc</span><span class="p">(</span><span class="n">path</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;path can be only a single string&quot;</span><span class="p">)</span></div>
<div class="viewcode-block" id="DataStreamReader.parquet"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.parquet.html#pyspark.sql.streaming.DataStreamReader.parquet">[docs]</a> <span class="k">def</span> <span class="nf">parquet</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">mergeSchema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">datetimeRebaseMode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">int96RebaseMode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataFrame&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Loads a Parquet file stream, returning the result as a :class:`DataFrame`.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> path : str</span>
<span class="sd"> the path in any Hadoop supported file system</span>
<span class="sd"> Other Parameters</span>
<span class="sd"> ----------------</span>
<span class="sd"> Extra options</span>
<span class="sd"> For the extra options, refer to</span>
<span class="sd"> `Data Source Option &lt;https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option&gt;`_.</span>
<span class="sd"> in the version you use.</span>
<span class="sd"> .. # noqa</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())</span>
<span class="sd"> &gt;&gt;&gt; parquet_sdf.isStreaming</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; parquet_sdf.schema == sdf_schema</span>
<span class="sd"> True</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span>
<span class="n">mergeSchema</span><span class="o">=</span><span class="n">mergeSchema</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span>
<span class="n">datetimeRebaseMode</span><span class="o">=</span><span class="n">datetimeRebaseMode</span><span class="p">,</span>
<span class="n">int96RebaseMode</span><span class="o">=</span><span class="n">int96RebaseMode</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="n">path</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;path can be only a single string&quot;</span><span class="p">)</span></div>
<div class="viewcode-block" id="DataStreamReader.text"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.text.html#pyspark.sql.streaming.DataStreamReader.text">[docs]</a> <span class="k">def</span> <span class="nf">text</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">wholetext</span><span class="p">:</span> <span class="nb">bool</span> <span class="o">=</span> <span class="kc">False</span><span class="p">,</span>
<span class="n">lineSep</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataFrame&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a</span>
<span class="sd"> string column named &quot;value&quot;, and followed by partitioned columns if there</span>
<span class="sd"> are any.</span>
<span class="sd"> The text files must be encoded as UTF-8.</span>
<span class="sd"> By default, each line in the text file is a new row in the resulting DataFrame.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> path : str or list</span>
<span class="sd"> string, or list of strings, for input path(s).</span>
<span class="sd"> Other Parameters</span>
<span class="sd"> ----------------</span>
<span class="sd"> Extra options</span>
<span class="sd"> For the extra options, refer to</span>
<span class="sd"> `Data Source Option &lt;https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option&gt;`_</span>
<span class="sd"> in the version you use.</span>
<span class="sd"> .. # noqa</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; text_sdf = spark.readStream.text(tempfile.mkdtemp())</span>
<span class="sd"> &gt;&gt;&gt; text_sdf.isStreaming</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; &quot;value&quot; in str(text_sdf.schema)</span>
<span class="sd"> True</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span>
<span class="n">wholetext</span><span class="o">=</span><span class="n">wholetext</span><span class="p">,</span>
<span class="n">lineSep</span><span class="o">=</span><span class="n">lineSep</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">text</span><span class="p">(</span><span class="n">path</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;path can be only a single string&quot;</span><span class="p">)</span></div>
<div class="viewcode-block" id="DataStreamReader.csv"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.csv.html#pyspark.sql.streaming.DataStreamReader.csv">[docs]</a> <span class="k">def</span> <span class="nf">csv</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">path</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">schema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="n">StructType</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">sep</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">encoding</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">quote</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">escape</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">comment</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">header</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">inferSchema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">ignoreLeadingWhiteSpace</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">ignoreTrailingWhiteSpace</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">nullValue</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">nanValue</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">positiveInf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">negativeInf</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">dateFormat</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">timestampFormat</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">maxColumns</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">maxCharsPerColumn</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">maxMalformedLogPerPartition</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">int</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">mode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">columnNameOfCorruptRecord</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">multiLine</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">charToEscapeQuoteEscaping</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">enforceSchema</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">emptyValue</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">locale</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">lineSep</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">bool</span><span class="p">,</span> <span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">unescapedQuoteHandling</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataFrame&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sa">r</span><span class="sd">&quot;&quot;&quot;Loads a CSV file stream and returns the result as a :class:`DataFrame`.</span>
<span class="sd"> This function will go through the input once to determine the input schema if</span>
<span class="sd"> ``inferSchema`` is enabled. To avoid going through the entire data once, disable</span>
<span class="sd"> ``inferSchema`` option or specify the schema explicitly using ``schema``.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> path : str or list</span>
<span class="sd"> string, or list of strings, for input path(s).</span>
<span class="sd"> schema : :class:`pyspark.sql.types.StructType` or str, optional</span>
<span class="sd"> an optional :class:`pyspark.sql.types.StructType` for the input schema</span>
<span class="sd"> or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Other Parameters</span>
<span class="sd"> ----------------</span>
<span class="sd"> Extra options</span>
<span class="sd"> For the extra options, refer to</span>
<span class="sd"> `Data Source Option &lt;https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option&gt;`_</span>
<span class="sd"> in the version you use.</span>
<span class="sd"> .. # noqa</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)</span>
<span class="sd"> &gt;&gt;&gt; csv_sdf.isStreaming</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; csv_sdf.schema == sdf_schema</span>
<span class="sd"> True</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_set_opts</span><span class="p">(</span>
<span class="n">schema</span><span class="o">=</span><span class="n">schema</span><span class="p">,</span>
<span class="n">sep</span><span class="o">=</span><span class="n">sep</span><span class="p">,</span>
<span class="n">encoding</span><span class="o">=</span><span class="n">encoding</span><span class="p">,</span>
<span class="n">quote</span><span class="o">=</span><span class="n">quote</span><span class="p">,</span>
<span class="n">escape</span><span class="o">=</span><span class="n">escape</span><span class="p">,</span>
<span class="n">comment</span><span class="o">=</span><span class="n">comment</span><span class="p">,</span>
<span class="n">header</span><span class="o">=</span><span class="n">header</span><span class="p">,</span>
<span class="n">inferSchema</span><span class="o">=</span><span class="n">inferSchema</span><span class="p">,</span>
<span class="n">ignoreLeadingWhiteSpace</span><span class="o">=</span><span class="n">ignoreLeadingWhiteSpace</span><span class="p">,</span>
<span class="n">ignoreTrailingWhiteSpace</span><span class="o">=</span><span class="n">ignoreTrailingWhiteSpace</span><span class="p">,</span>
<span class="n">nullValue</span><span class="o">=</span><span class="n">nullValue</span><span class="p">,</span>
<span class="n">nanValue</span><span class="o">=</span><span class="n">nanValue</span><span class="p">,</span>
<span class="n">positiveInf</span><span class="o">=</span><span class="n">positiveInf</span><span class="p">,</span>
<span class="n">negativeInf</span><span class="o">=</span><span class="n">negativeInf</span><span class="p">,</span>
<span class="n">dateFormat</span><span class="o">=</span><span class="n">dateFormat</span><span class="p">,</span>
<span class="n">timestampFormat</span><span class="o">=</span><span class="n">timestampFormat</span><span class="p">,</span>
<span class="n">maxColumns</span><span class="o">=</span><span class="n">maxColumns</span><span class="p">,</span>
<span class="n">maxCharsPerColumn</span><span class="o">=</span><span class="n">maxCharsPerColumn</span><span class="p">,</span>
<span class="n">maxMalformedLogPerPartition</span><span class="o">=</span><span class="n">maxMalformedLogPerPartition</span><span class="p">,</span>
<span class="n">mode</span><span class="o">=</span><span class="n">mode</span><span class="p">,</span>
<span class="n">columnNameOfCorruptRecord</span><span class="o">=</span><span class="n">columnNameOfCorruptRecord</span><span class="p">,</span>
<span class="n">multiLine</span><span class="o">=</span><span class="n">multiLine</span><span class="p">,</span>
<span class="n">charToEscapeQuoteEscaping</span><span class="o">=</span><span class="n">charToEscapeQuoteEscaping</span><span class="p">,</span>
<span class="n">enforceSchema</span><span class="o">=</span><span class="n">enforceSchema</span><span class="p">,</span>
<span class="n">emptyValue</span><span class="o">=</span><span class="n">emptyValue</span><span class="p">,</span>
<span class="n">locale</span><span class="o">=</span><span class="n">locale</span><span class="p">,</span>
<span class="n">lineSep</span><span class="o">=</span><span class="n">lineSep</span><span class="p">,</span>
<span class="n">pathGlobFilter</span><span class="o">=</span><span class="n">pathGlobFilter</span><span class="p">,</span>
<span class="n">recursiveFileLookup</span><span class="o">=</span><span class="n">recursiveFileLookup</span><span class="p">,</span>
<span class="n">unescapedQuoteHandling</span><span class="o">=</span><span class="n">unescapedQuoteHandling</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">csv</span><span class="p">(</span><span class="n">path</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;path can be only a single string&quot;</span><span class="p">)</span></div>
<div class="viewcode-block" id="DataStreamReader.table"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.table.html#pyspark.sql.streaming.DataStreamReader.table">[docs]</a> <span class="k">def</span> <span class="nf">table</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tableName</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataFrame&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should</span>
<span class="sd"> support streaming mode.</span>
<span class="sd"> .. versionadded:: 3.1.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> tableName : str</span>
<span class="sd"> string, for the name of the table.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> :class:`DataFrame`</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; spark.readStream.table(&#39;input_table&#39;) # doctest: +SKIP</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">tableName</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jreader</span><span class="o">.</span><span class="n">table</span><span class="p">(</span><span class="n">tableName</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;tableName can be only a single string&quot;</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="DataStreamWriter"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter">[docs]</a><span class="k">class</span> <span class="nc">DataStreamWriter</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Interface used to write a streaming :class:`DataFrame &lt;pyspark.sql.DataFrame&gt;` to external</span>
<span class="sd"> storage systems (e.g. file systems, key-value stores, etc).</span>
<span class="sd"> Use :attr:`DataFrame.writeStream &lt;pyspark.sql.DataFrame.writeStream&gt;`</span>
<span class="sd"> to access this.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">df</span><span class="p">:</span> <span class="s2">&quot;DataFrame&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_df</span> <span class="o">=</span> <span class="n">df</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spark</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">sparkSession</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">_jdf</span><span class="o">.</span><span class="n">writeStream</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_sq</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">jsq</span><span class="p">:</span> <span class="n">JavaObject</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">StreamingQuery</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.streaming</span> <span class="kn">import</span> <span class="n">StreamingQuery</span>
<span class="k">return</span> <span class="n">StreamingQuery</span><span class="p">(</span><span class="n">jsq</span><span class="p">)</span>
<div class="viewcode-block" id="DataStreamWriter.outputMode"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.outputMode.html#pyspark.sql.streaming.DataStreamWriter.outputMode">[docs]</a> <span class="k">def</span> <span class="nf">outputMode</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">outputMode</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Options include:</span>
<span class="sd"> * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to</span>
<span class="sd"> the sink</span>
<span class="sd"> * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the sink</span>
<span class="sd"> every time these are some updates</span>
<span class="sd"> * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be</span>
<span class="sd"> written to the sink every time there are some updates. If the query doesn&#39;t contain</span>
<span class="sd"> aggregations, it will be equivalent to `append` mode.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.outputMode(&#39;append&#39;)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">outputMode</span> <span class="ow">or</span> <span class="nb">type</span><span class="p">(</span><span class="n">outputMode</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">outputMode</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;The output mode must be a non-empty string. Got: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">outputMode</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">outputMode</span><span class="p">(</span><span class="n">outputMode</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamWriter.format"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.format.html#pyspark.sql.streaming.DataStreamWriter.format">[docs]</a> <span class="k">def</span> <span class="nf">format</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Specifies the underlying output data source.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> source : str</span>
<span class="sd"> string, name of the data source, which for now can be &#39;parquet&#39;.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.format(&#39;json&#39;)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">source</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamWriter.option"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.option.html#pyspark.sql.streaming.DataStreamWriter.option">[docs]</a> <span class="k">def</span> <span class="nf">option</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">value</span><span class="p">:</span> <span class="s2">&quot;OptionalPrimitiveType&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Adds an output option for the underlying data source.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">to_str</span><span class="p">(</span><span class="n">value</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamWriter.options"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.options.html#pyspark.sql.streaming.DataStreamWriter.options">[docs]</a> <span class="k">def</span> <span class="nf">options</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">&quot;OptionalPrimitiveType&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Adds output options for the underlying data source.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">options</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">to_str</span><span class="p">(</span><span class="n">options</span><span class="p">[</span><span class="n">k</span><span class="p">]))</span>
<span class="k">return</span> <span class="bp">self</span></div>
<span class="nd">@overload</span>
<span class="k">def</span> <span class="nf">partitionBy</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">cols</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="o">...</span>
<span class="nd">@overload</span>
<span class="k">def</span> <span class="nf">partitionBy</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">__cols</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="o">...</span>
<div class="viewcode-block" id="DataStreamWriter.partitionBy"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.partitionBy.html#pyspark.sql.streaming.DataStreamWriter.partitionBy">[docs]</a> <span class="k">def</span> <span class="nf">partitionBy</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">cols</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span> <span class="c1"># type: ignore[misc]</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Partitions the output by the given columns on the file system.</span>
<span class="sd"> If specified, the output is laid out on the file system similar</span>
<span class="sd"> to Hive&#39;s partitioning scheme.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> cols : str or list</span>
<span class="sd"> name of columns</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">cols</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">cols</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="p">(</span><span class="nb">list</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">)):</span>
<span class="n">cols</span> <span class="o">=</span> <span class="n">cols</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="n">_to_seq</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="n">cols</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamWriter.queryName"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.queryName.html#pyspark.sql.streaming.DataStreamWriter.queryName">[docs]</a> <span class="k">def</span> <span class="nf">queryName</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">queryName</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Specifies the name of the :class:`StreamingQuery` that can be started with</span>
<span class="sd"> :func:`start`. This name must be unique among all the currently active queries</span>
<span class="sd"> in the associated SparkSession.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> queryName : str</span>
<span class="sd"> unique name for the query</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.queryName(&#39;streaming_query&#39;)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">queryName</span> <span class="ow">or</span> <span class="nb">type</span><span class="p">(</span><span class="n">queryName</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">queryName</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;The queryName must be a non-empty string. Got: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">queryName</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">queryName</span><span class="p">(</span><span class="n">queryName</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<span class="nd">@overload</span>
<span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">processingTime</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="o">...</span>
<span class="nd">@overload</span>
<span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">once</span><span class="p">:</span> <span class="nb">bool</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="o">...</span>
<span class="nd">@overload</span>
<span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">continuous</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="o">...</span>
<span class="nd">@overload</span>
<span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="p">,</span> <span class="n">availableNow</span><span class="p">:</span> <span class="nb">bool</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="o">...</span>
<div class="viewcode-block" id="DataStreamWriter.trigger"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.trigger.html#pyspark.sql.streaming.DataStreamWriter.trigger">[docs]</a> <span class="k">def</span> <span class="nf">trigger</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="o">*</span><span class="p">,</span>
<span class="n">processingTime</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">once</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">continuous</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">availableNow</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Set the trigger for the stream query. If this is not set it will run the query as fast</span>
<span class="sd"> as possible, which is equivalent to setting the trigger to ``processingTime=&#39;0 seconds&#39;``.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> processingTime : str, optional</span>
<span class="sd"> a processing time interval as a string, e.g. &#39;5 seconds&#39;, &#39;1 minute&#39;.</span>
<span class="sd"> Set a trigger that runs a microbatch query periodically based on the</span>
<span class="sd"> processing time. Only one trigger can be set.</span>
<span class="sd"> once : bool, optional</span>
<span class="sd"> if set to True, set a trigger that processes only one batch of data in a</span>
<span class="sd"> streaming query then terminates the query. Only one trigger can be set.</span>
<span class="sd"> continuous : str, optional</span>
<span class="sd"> a time interval as a string, e.g. &#39;5 seconds&#39;, &#39;1 minute&#39;.</span>
<span class="sd"> Set a trigger that runs a continuous query with a given checkpoint</span>
<span class="sd"> interval. Only one trigger can be set.</span>
<span class="sd"> availableNow : bool, optional</span>
<span class="sd"> if set to True, set a trigger that processes all available data in multiple</span>
<span class="sd"> batches then terminates the query. Only one trigger can be set.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; # trigger the query for execution every 5 seconds</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.trigger(processingTime=&#39;5 seconds&#39;)</span>
<span class="sd"> &gt;&gt;&gt; # trigger the query for just once batch of data</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.trigger(once=True)</span>
<span class="sd"> &gt;&gt;&gt; # trigger the query for execution every 5 seconds</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.trigger(continuous=&#39;5 seconds&#39;)</span>
<span class="sd"> &gt;&gt;&gt; # trigger the query for reading all available data with multiple batches</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.trigger(availableNow=True)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">params</span> <span class="o">=</span> <span class="p">[</span><span class="n">processingTime</span><span class="p">,</span> <span class="n">once</span><span class="p">,</span> <span class="n">continuous</span><span class="p">,</span> <span class="n">availableNow</span><span class="p">]</span>
<span class="k">if</span> <span class="n">params</span><span class="o">.</span><span class="n">count</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span> <span class="o">==</span> <span class="mi">4</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;No trigger provided&quot;</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">params</span><span class="o">.</span><span class="n">count</span><span class="p">(</span><span class="kc">None</span><span class="p">)</span> <span class="o">&lt;</span> <span class="mi">3</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Multiple triggers not allowed.&quot;</span><span class="p">)</span>
<span class="n">jTrigger</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">processingTime</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="n">processingTime</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">processingTime</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;Value for processingTime must be a non empty string. Got: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">processingTime</span>
<span class="p">)</span>
<span class="n">interval</span> <span class="o">=</span> <span class="n">processingTime</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span>
<span class="n">jTrigger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">Trigger</span><span class="o">.</span><span class="n">ProcessingTime</span><span class="p">(</span>
<span class="n">interval</span>
<span class="p">)</span>
<span class="k">elif</span> <span class="n">once</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="n">once</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">True</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Value for once must be True. Got: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">once</span><span class="p">)</span>
<span class="n">jTrigger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">Trigger</span><span class="o">.</span><span class="n">Once</span><span class="p">()</span>
<span class="k">elif</span> <span class="n">continuous</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="n">continuous</span><span class="p">)</span> <span class="o">!=</span> <span class="nb">str</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="n">continuous</span><span class="o">.</span><span class="n">strip</span><span class="p">())</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s2">&quot;Value for continuous must be a non empty string. Got: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">continuous</span>
<span class="p">)</span>
<span class="n">interval</span> <span class="o">=</span> <span class="n">continuous</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span>
<span class="n">jTrigger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">Trigger</span><span class="o">.</span><span class="n">Continuous</span><span class="p">(</span>
<span class="n">interval</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="n">availableNow</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">True</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Value for availableNow must be True. Got: </span><span class="si">%s</span><span class="s2">&quot;</span> <span class="o">%</span> <span class="n">availableNow</span><span class="p">)</span>
<span class="n">jTrigger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="n">Trigger</span><span class="o">.</span><span class="n">AvailableNow</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">trigger</span><span class="p">(</span><span class="n">jTrigger</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<span class="nd">@overload</span>
<span class="k">def</span> <span class="nf">foreach</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">Row</span><span class="p">],</span> <span class="kc">None</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="o">...</span>
<span class="nd">@overload</span>
<span class="k">def</span> <span class="nf">foreach</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="s2">&quot;SupportsProcess&quot;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="o">...</span>
<div class="viewcode-block" id="DataStreamWriter.foreach"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.foreach.html#pyspark.sql.streaming.DataStreamWriter.foreach">[docs]</a> <span class="k">def</span> <span class="nf">foreach</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">Callable</span><span class="p">[[</span><span class="n">Row</span><span class="p">],</span> <span class="kc">None</span><span class="p">],</span> <span class="s2">&quot;SupportsProcess&quot;</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Sets the output of the streaming query to be processed using the provided writer ``f``.</span>
<span class="sd"> This is often used to write the output of a streaming query to arbitrary storage systems.</span>
<span class="sd"> The processing logic can be specified in two ways.</span>
<span class="sd"> #. A **function** that takes a row as input.</span>
<span class="sd"> This is a simple way to express your processing logic. Note that this does</span>
<span class="sd"> not allow you to deduplicate generated data when failures cause reprocessing of</span>
<span class="sd"> some input data. That would require you to specify the processing logic in the next</span>
<span class="sd"> way.</span>
<span class="sd"> #. An **object** with a ``process`` method and optional ``open`` and ``close`` methods.</span>
<span class="sd"> The object can have the following methods.</span>
<span class="sd"> * ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing</span>
<span class="sd"> (for example, open a connection, start a transaction, etc). Additionally, you can</span>
<span class="sd"> use the `partition_id` and `epoch_id` to deduplicate regenerated data</span>
<span class="sd"> (discussed later).</span>
<span class="sd"> * ``process(row)``: *Non-optional* method that processes each :class:`Row`.</span>
<span class="sd"> * ``close(error)``: *Optional* method that finalizes and cleans up (for example,</span>
<span class="sd"> close connection, commit transaction, etc.) after all rows have been processed.</span>
<span class="sd"> The object will be used by Spark in the following way.</span>
<span class="sd"> * A single copy of this object is responsible of all the data generated by a</span>
<span class="sd"> single task in a query. In other words, one instance is responsible for</span>
<span class="sd"> processing one partition of the data generated in a distributed manner.</span>
<span class="sd"> * This object must be serializable because each task will get a fresh</span>
<span class="sd"> serialized-deserialized copy of the provided object. Hence, it is strongly</span>
<span class="sd"> recommended that any initialization for writing data (e.g. opening a</span>
<span class="sd"> connection or starting a transaction) is done after the `open(...)`</span>
<span class="sd"> method has been called, which signifies that the task is ready to generate data.</span>
<span class="sd"> * The lifecycle of the methods are as follows.</span>
<span class="sd"> For each partition with ``partition_id``:</span>
<span class="sd"> ... For each batch/epoch of streaming data with ``epoch_id``:</span>
<span class="sd"> ....... Method ``open(partitionId, epochId)`` is called.</span>
<span class="sd"> ....... If ``open(...)`` returns true, for each row in the partition and</span>
<span class="sd"> batch/epoch, method ``process(row)`` is called.</span>
<span class="sd"> ....... Method ``close(errorOrNull)`` is called with error (if any) seen while</span>
<span class="sd"> processing rows.</span>
<span class="sd"> Important points to note:</span>
<span class="sd"> * The `partitionId` and `epochId` can be used to deduplicate generated data when</span>
<span class="sd"> failures cause reprocessing of some input data. This depends on the execution</span>
<span class="sd"> mode of the query. If the streaming query is being executed in the micro-batch</span>
<span class="sd"> mode, then every partition represented by a unique tuple (partition_id, epoch_id)</span>
<span class="sd"> is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used</span>
<span class="sd"> to deduplicate and/or transactionally commit data and achieve exactly-once</span>
<span class="sd"> guarantees. However, if the streaming query is being executed in the continuous</span>
<span class="sd"> mode, then this guarantee does not hold and therefore should not be used for</span>
<span class="sd"> deduplication.</span>
<span class="sd"> * The ``close()`` method (if exists) will be called if `open()` method exists and</span>
<span class="sd"> returns successfully (irrespective of the return value), except if the Python</span>
<span class="sd"> crashes in the middle.</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; # Print every row using a function</span>
<span class="sd"> &gt;&gt;&gt; def print_row(row):</span>
<span class="sd"> ... print(row)</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.foreach(print_row)</span>
<span class="sd"> &gt;&gt;&gt; # Print every row using a object with process() method</span>
<span class="sd"> &gt;&gt;&gt; class RowPrinter:</span>
<span class="sd"> ... def open(self, partition_id, epoch_id):</span>
<span class="sd"> ... print(&quot;Opened %d, %d&quot; % (partition_id, epoch_id))</span>
<span class="sd"> ... return True</span>
<span class="sd"> ... def process(self, row):</span>
<span class="sd"> ... print(row)</span>
<span class="sd"> ... def close(self, error):</span>
<span class="sd"> ... print(&quot;Closed with error: %s&quot; % str(error))</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.foreach(RowPrinter())</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">pyspark.rdd</span> <span class="kn">import</span> <span class="n">_wrap_function</span>
<span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="n">CPickleSerializer</span><span class="p">,</span> <span class="n">AutoBatchedSerializer</span>
<span class="kn">from</span> <span class="nn">pyspark.taskcontext</span> <span class="kn">import</span> <span class="n">TaskContext</span>
<span class="k">if</span> <span class="n">callable</span><span class="p">(</span><span class="n">f</span><span class="p">):</span>
<span class="c1"># The provided object is a callable function that is supposed to be called on each row.</span>
<span class="c1"># Construct a function that takes an iterator and calls the provided function on each</span>
<span class="c1"># row.</span>
<span class="k">def</span> <span class="nf">func_without_process</span><span class="p">(</span><span class="n">_</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">:</span>
<span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="n">f</span><span class="p">(</span><span class="n">x</span><span class="p">)</span> <span class="c1"># type: ignore[operator]</span>
<span class="k">return</span> <span class="nb">iter</span><span class="p">([])</span>
<span class="n">func</span> <span class="o">=</span> <span class="n">func_without_process</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># The provided object is not a callable function. Then it is expected to have a</span>
<span class="c1"># &#39;process(row)&#39; method, and optional &#39;open(partition_id, epoch_id)&#39; and</span>
<span class="c1"># &#39;close(error)&#39; methods.</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="s2">&quot;process&quot;</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span><span class="s2">&quot;Provided object does not have a &#39;process&#39; method&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="s2">&quot;process&quot;</span><span class="p">)):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s2">&quot;Attribute &#39;process&#39; in provided object is not callable&quot;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">doesMethodExist</span><span class="p">(</span><span class="n">method_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">bool</span><span class="p">:</span>
<span class="n">exists</span> <span class="o">=</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="n">method_name</span><span class="p">)</span>
<span class="k">if</span> <span class="n">exists</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="n">f</span><span class="p">,</span> <span class="n">method_name</span><span class="p">)):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s2">&quot;Attribute &#39;</span><span class="si">%s</span><span class="s2">&#39; in provided object is not callable&quot;</span> <span class="o">%</span> <span class="n">method_name</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">exists</span>
<span class="n">open_exists</span> <span class="o">=</span> <span class="n">doesMethodExist</span><span class="p">(</span><span class="s2">&quot;open&quot;</span><span class="p">)</span>
<span class="n">close_exists</span> <span class="o">=</span> <span class="n">doesMethodExist</span><span class="p">(</span><span class="s2">&quot;close&quot;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">func_with_open_process_close</span><span class="p">(</span><span class="n">partition_id</span><span class="p">:</span> <span class="n">Any</span><span class="p">,</span> <span class="n">iterator</span><span class="p">:</span> <span class="n">Iterator</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterator</span><span class="p">:</span>
<span class="n">epoch_id</span> <span class="o">=</span> <span class="n">cast</span><span class="p">(</span><span class="n">TaskContext</span><span class="p">,</span> <span class="n">TaskContext</span><span class="o">.</span><span class="n">get</span><span class="p">())</span><span class="o">.</span><span class="n">getLocalProperty</span><span class="p">(</span>
<span class="s2">&quot;streaming.sql.batchId&quot;</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">epoch_id</span><span class="p">:</span>
<span class="n">int_epoch_id</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">epoch_id</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="s2">&quot;Could not get batch id from TaskContext&quot;</span><span class="p">)</span>
<span class="c1"># Check if the data should be processed</span>
<span class="n">should_process</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">if</span> <span class="n">open_exists</span><span class="p">:</span>
<span class="n">should_process</span> <span class="o">=</span> <span class="n">f</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">partition_id</span><span class="p">,</span> <span class="n">int_epoch_id</span><span class="p">)</span> <span class="c1"># type: ignore[union-attr]</span>
<span class="n">error</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">if</span> <span class="n">should_process</span><span class="p">:</span>
<span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
<span class="n">cast</span><span class="p">(</span><span class="s2">&quot;SupportsProcess&quot;</span><span class="p">,</span> <span class="n">f</span><span class="p">)</span><span class="o">.</span><span class="n">process</span><span class="p">(</span><span class="n">x</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">ex</span><span class="p">:</span>
<span class="n">error</span> <span class="o">=</span> <span class="n">ex</span>
<span class="k">finally</span><span class="p">:</span>
<span class="k">if</span> <span class="n">close_exists</span><span class="p">:</span>
<span class="n">f</span><span class="o">.</span><span class="n">close</span><span class="p">(</span><span class="n">error</span><span class="p">)</span> <span class="c1"># type: ignore[union-attr]</span>
<span class="k">if</span> <span class="n">error</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">error</span>
<span class="k">return</span> <span class="nb">iter</span><span class="p">([])</span>
<span class="n">func</span> <span class="o">=</span> <span class="n">func_with_open_process_close</span> <span class="c1"># type: ignore[assignment]</span>
<span class="n">serializer</span> <span class="o">=</span> <span class="n">AutoBatchedSerializer</span><span class="p">(</span><span class="n">CPickleSerializer</span><span class="p">())</span>
<span class="n">wrapped_func</span> <span class="o">=</span> <span class="n">_wrap_function</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="p">,</span> <span class="n">func</span><span class="p">,</span> <span class="n">serializer</span><span class="p">,</span> <span class="n">serializer</span><span class="p">)</span>
<span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">jForeachWriter</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">execution</span><span class="o">.</span><span class="n">python</span><span class="o">.</span><span class="n">PythonForeachWriter</span><span class="p">(</span>
<span class="n">wrapped_func</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_df</span><span class="o">.</span><span class="n">_jdf</span><span class="o">.</span><span class="n">schema</span><span class="p">()</span>
<span class="p">)</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">foreach</span><span class="p">(</span><span class="n">jForeachWriter</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamWriter.foreachBatch"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.foreachBatch.html#pyspark.sql.streaming.DataStreamWriter.foreachBatch">[docs]</a> <span class="k">def</span> <span class="nf">foreachBatch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">func</span><span class="p">:</span> <span class="n">Callable</span><span class="p">[[</span><span class="s2">&quot;DataFrame&quot;</span><span class="p">,</span> <span class="nb">int</span><span class="p">],</span> <span class="kc">None</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="s2">&quot;DataStreamWriter&quot;</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Sets the output of the streaming query to be processed using the provided</span>
<span class="sd"> function. This is supported only the in the micro-batch execution modes (that is, when the</span>
<span class="sd"> trigger is not continuous). In every micro-batch, the provided function will be called in</span>
<span class="sd"> every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier.</span>
<span class="sd"> The batchId can be used deduplicate and transactionally write the output</span>
<span class="sd"> (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed</span>
<span class="sd"> to exactly same for the same batchId (assuming all operations are deterministic in the</span>
<span class="sd"> query).</span>
<span class="sd"> .. versionadded:: 2.4.0</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; def func(batch_df, batch_id):</span>
<span class="sd"> ... batch_df.collect()</span>
<span class="sd"> ...</span>
<span class="sd"> &gt;&gt;&gt; writer = sdf.writeStream.foreachBatch(func)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">pyspark.java_gateway</span> <span class="kn">import</span> <span class="n">ensure_callback_server_started</span>
<span class="n">gw</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_gateway</span>
<span class="k">assert</span> <span class="n">gw</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
<span class="n">java_import</span><span class="p">(</span><span class="n">gw</span><span class="o">.</span><span class="n">jvm</span><span class="p">,</span> <span class="s2">&quot;org.apache.spark.sql.execution.streaming.sources.*&quot;</span><span class="p">)</span>
<span class="n">wrapped_func</span> <span class="o">=</span> <span class="n">ForeachBatchFunction</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_spark</span><span class="p">,</span> <span class="n">func</span><span class="p">)</span>
<span class="n">gw</span><span class="o">.</span><span class="n">jvm</span><span class="o">.</span><span class="n">PythonForeachBatchHelper</span><span class="o">.</span><span class="n">callForeachBatch</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="p">,</span> <span class="n">wrapped_func</span><span class="p">)</span>
<span class="n">ensure_callback_server_started</span><span class="p">(</span><span class="n">gw</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DataStreamWriter.start"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.start.html#pyspark.sql.streaming.DataStreamWriter.start">[docs]</a> <span class="k">def</span> <span class="nf">start</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">path</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="nb">format</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">outputMode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">partitionBy</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">queryName</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">&quot;OptionalPrimitiveType&quot;</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">StreamingQuery</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Streams the contents of the :class:`DataFrame` to a data source.</span>
<span class="sd"> The data source is specified by the ``format`` and a set of ``options``.</span>
<span class="sd"> If ``format`` is not specified, the default data source configured by</span>
<span class="sd"> ``spark.sql.sources.default`` will be used.</span>
<span class="sd"> .. versionadded:: 2.0.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> path : str, optional</span>
<span class="sd"> the path in a Hadoop supported file system</span>
<span class="sd"> format : str, optional</span>
<span class="sd"> the format used to save</span>
<span class="sd"> outputMode : str, optional</span>
<span class="sd"> specifies how data of a streaming DataFrame/Dataset is written to a</span>
<span class="sd"> streaming sink.</span>
<span class="sd"> * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to the</span>
<span class="sd"> sink</span>
<span class="sd"> * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the</span>
<span class="sd"> sink every time these are some updates</span>
<span class="sd"> * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be</span>
<span class="sd"> written to the sink every time there are some updates. If the query doesn&#39;t contain</span>
<span class="sd"> aggregations, it will be equivalent to `append` mode.</span>
<span class="sd"> partitionBy : str or list, optional</span>
<span class="sd"> names of partitioning columns</span>
<span class="sd"> queryName : str, optional</span>
<span class="sd"> unique name for the query</span>
<span class="sd"> **options : dict</span>
<span class="sd"> All other string options. You may want to provide a `checkpointLocation`</span>
<span class="sd"> for most streams, however it is not required for a `memory` stream.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; sq = sdf.writeStream.format(&#39;memory&#39;).queryName(&#39;this_query&#39;).start()</span>
<span class="sd"> &gt;&gt;&gt; sq.isActive</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; sq.name</span>
<span class="sd"> &#39;this_query&#39;</span>
<span class="sd"> &gt;&gt;&gt; sq.stop()</span>
<span class="sd"> &gt;&gt;&gt; sq.isActive</span>
<span class="sd"> False</span>
<span class="sd"> &gt;&gt;&gt; sq = sdf.writeStream.trigger(processingTime=&#39;5 seconds&#39;).start(</span>
<span class="sd"> ... queryName=&#39;that_query&#39;, outputMode=&quot;append&quot;, format=&#39;memory&#39;)</span>
<span class="sd"> &gt;&gt;&gt; sq.name</span>
<span class="sd"> &#39;that_query&#39;</span>
<span class="sd"> &gt;&gt;&gt; sq.isActive</span>
<span class="sd"> True</span>
<span class="sd"> &gt;&gt;&gt; sq.stop()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">(</span><span class="o">**</span><span class="n">options</span><span class="p">)</span>
<span class="k">if</span> <span class="n">outputMode</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">outputMode</span><span class="p">(</span><span class="n">outputMode</span><span class="p">)</span>
<span class="k">if</span> <span class="n">partitionBy</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="n">partitionBy</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">format</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">format</span><span class="p">)</span>
<span class="k">if</span> <span class="n">queryName</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queryName</span><span class="p">(</span><span class="n">queryName</span><span class="p">)</span>
<span class="k">if</span> <span class="n">path</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sq</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">start</span><span class="p">())</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sq</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">start</span><span class="p">(</span><span class="n">path</span><span class="p">))</span></div>
<div class="viewcode-block" id="DataStreamWriter.toTable"><a class="viewcode-back" href="../../../reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.toTable.html#pyspark.sql.streaming.DataStreamWriter.toTable">[docs]</a> <span class="k">def</span> <span class="nf">toTable</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">tableName</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="nb">format</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">outputMode</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">partitionBy</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]]]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">queryName</span><span class="p">:</span> <span class="n">Optional</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="o">**</span><span class="n">options</span><span class="p">:</span> <span class="s2">&quot;OptionalPrimitiveType&quot;</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">StreamingQuery</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Starts the execution of the streaming query, which will continually output results to the</span>
<span class="sd"> given table as new data arrives.</span>
<span class="sd"> The returned :class:`StreamingQuery` object can be used to interact with the stream.</span>
<span class="sd"> .. versionadded:: 3.1.0</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> tableName : str</span>
<span class="sd"> string, for the name of the table.</span>
<span class="sd"> format : str, optional</span>
<span class="sd"> the format used to save.</span>
<span class="sd"> outputMode : str, optional</span>
<span class="sd"> specifies how data of a streaming DataFrame/Dataset is written to a</span>
<span class="sd"> streaming sink.</span>
<span class="sd"> * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to the</span>
<span class="sd"> sink</span>
<span class="sd"> * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the</span>
<span class="sd"> sink every time these are some updates</span>
<span class="sd"> * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be</span>
<span class="sd"> written to the sink every time there are some updates. If the query doesn&#39;t contain</span>
<span class="sd"> aggregations, it will be equivalent to `append` mode.</span>
<span class="sd"> partitionBy : str or list, optional</span>
<span class="sd"> names of partitioning columns</span>
<span class="sd"> queryName : str, optional</span>
<span class="sd"> unique name for the query</span>
<span class="sd"> **options : dict</span>
<span class="sd"> All other string options. You may want to provide a `checkpointLocation`.</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> This API is evolving.</span>
<span class="sd"> For v1 table, partitioning columns provided by `partitionBy` will be respected no matter</span>
<span class="sd"> the table exists or not. A new table will be created if the table not exists.</span>
<span class="sd"> For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will</span>
<span class="sd"> be respected only if the v2 table does not exist. Besides, the v2 table created by this API</span>
<span class="sd"> lacks some functionalities (e.g., customized properties, options, and serde info). If you</span>
<span class="sd"> need them, please create the v2 table manually before the execution to avoid creating a</span>
<span class="sd"> table with incomplete information.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> &gt;&gt;&gt; sdf.writeStream.format(&#39;parquet&#39;).queryName(&#39;query&#39;).toTable(&#39;output_table&#39;)</span>
<span class="sd"> ... # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; sdf.writeStream.trigger(processingTime=&#39;5 seconds&#39;).toTable(</span>
<span class="sd"> ... &#39;output_table&#39;,</span>
<span class="sd"> ... queryName=&#39;that_query&#39;,</span>
<span class="sd"> ... outputMode=&quot;append&quot;,</span>
<span class="sd"> ... format=&#39;parquet&#39;,</span>
<span class="sd"> ... checkpointLocation=&#39;/tmp/checkpoint&#39;) # doctest: +SKIP</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">(</span><span class="o">**</span><span class="n">options</span><span class="p">)</span>
<span class="k">if</span> <span class="n">outputMode</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">outputMode</span><span class="p">(</span><span class="n">outputMode</span><span class="p">)</span>
<span class="k">if</span> <span class="n">partitionBy</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">partitionBy</span><span class="p">(</span><span class="n">partitionBy</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">format</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">format</span><span class="p">)</span>
<span class="k">if</span> <span class="n">queryName</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queryName</span><span class="p">(</span><span class="n">queryName</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_sq</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_jwrite</span><span class="o">.</span><span class="n">toTable</span><span class="p">(</span><span class="n">tableName</span><span class="p">))</span></div></div>
<span class="k">def</span> <span class="nf">_test</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">doctest</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">tempfile</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span><span class="p">,</span> <span class="n">SQLContext</span>
<span class="kn">import</span> <span class="nn">pyspark.sql.streaming</span>
<span class="kn">from</span> <span class="nn">py4j.protocol</span> <span class="kn">import</span> <span class="n">Py4JError</span>
<span class="n">os</span><span class="o">.</span><span class="n">chdir</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="p">[</span><span class="s2">&quot;SPARK_HOME&quot;</span><span class="p">])</span>
<span class="n">globs</span> <span class="o">=</span> <span class="n">pyspark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">_getActiveSessionOrCreate</span><span class="p">()</span>
<span class="k">except</span> <span class="n">Py4JError</span><span class="p">:</span> <span class="c1"># noqa: F821</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> <span class="c1"># type: ignore[name-defined] # noqa: F821</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;tempfile&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">tempfile</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;os&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">os</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">spark</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;sqlContext&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">(</span><span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span><span class="p">)</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;sdf&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">readStream</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">&quot;text&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="s2">&quot;python/test_support/sql/streaming&quot;</span><span class="p">)</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;sdf_schema&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">([</span><span class="n">StructField</span><span class="p">(</span><span class="s2">&quot;data&quot;</span><span class="p">,</span> <span class="n">StringType</span><span class="p">(),</span> <span class="kc">True</span><span class="p">)])</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;df&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span><span class="o">.</span><span class="n">readStream</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">&quot;text&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="s2">&quot;python/test_support/sql/streaming&quot;</span><span class="p">)</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span>
<span class="n">pyspark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">streaming</span><span class="p">,</span>
<span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span>
<span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span> <span class="o">|</span> <span class="n">doctest</span><span class="o">.</span><span class="n">NORMALIZE_WHITESPACE</span> <span class="o">|</span> <span class="n">doctest</span><span class="o">.</span><span class="n">REPORT_NDIFF</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;spark&quot;</span><span class="p">]</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="n">_test</span><span class="p">()</span>
</pre></div>
</div>
<div class='prev-next-bottom'>
</div>
</main>
</div>
</div>
<script src="../../../_static/js/index.3da636dd464baa7582d2.js"></script>
<footer class="footer mt-5 mt-md-0">
<div class="container">
<p>
&copy; Copyright .<br/>
Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br/>
</p>
</div>
</footer>
</body>
</html>