blob: 9e8259f9e7721ddaf858e7fc1144aae7493251c8 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>pyspark.sql.streaming.DataStreamWriter.foreachBatch &#8212; PySpark 3.4.3 documentation</title>
<link rel="stylesheet" href="../../../_static/css/index.73d71520a4ca3b99cfee5594769eaaae.css">
<link rel="stylesheet"
href="../../../_static/vendor/fontawesome/5.13.0/css/all.min.css">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
<link rel="preload" as="font" type="font/woff2" crossorigin
href="../../../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
<link rel="stylesheet"
href="../../../_static/vendor/open-sans_all/1.44.1/index.css">
<link rel="stylesheet"
href="../../../_static/vendor/lato_latin-ext/1.44.1/index.css">
<link rel="stylesheet" href="../../../_static/basic.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" type="text/css" href="../../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/pyspark.css" />
<link rel="preload" as="script" href="../../../_static/js/index.3da636dd464baa7582d2.js">
<script id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script src="../../../_static/jquery.js"></script>
<script src="../../../_static/underscore.js"></script>
<script src="../../../_static/doctools.js"></script>
<script src="../../../_static/language_data.js"></script>
<script src="../../../_static/clipboard.min.js"></script>
<script src="../../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/x-mathjax-config">MathJax.Hub.Config({"tex2jax": {"inlineMath": [["$", "$"], ["\\(", "\\)"]], "processEscapes": true, "ignoreClass": "document", "processClass": "math|output_area"}})</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.foreachBatch.html" />
<link rel="search" title="Search" href="../../../search.html" />
<link rel="next" title="pyspark.sql.streaming.DataStreamWriter.format" href="pyspark.sql.streaming.DataStreamWriter.format.html" />
<link rel="prev" title="pyspark.sql.streaming.DataStreamWriter.foreach" href="pyspark.sql.streaming.DataStreamWriter.foreach.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="en" />
</head>
<body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
<nav class="navbar navbar-light navbar-expand-lg bg-light fixed-top bd-navbar" id="navbar-main">
<div class="container-xl">
<a class="navbar-brand" href="../../../index.html">
<img src="../../../_static/spark-logo-reverse.png" class="logo" alt="logo" />
</a>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbar-menu" aria-controls="navbar-menu" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div id="navbar-menu" class="col-lg-9 collapse navbar-collapse">
<ul id="navbar-main-elements" class="navbar-nav mr-auto">
<li class="nav-item ">
<a class="nav-link" href="../../../index.html">Overview</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../../getting_started/index.html">Getting Started</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../../user_guide/index.html">User Guides</a>
</li>
<li class="nav-item active">
<a class="nav-link" href="../../index.html">API Reference</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../../development/index.html">Development</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="../../../migration_guide/index.html">Migration Guides</a>
</li>
</ul>
<ul class="navbar-nav">
</ul>
</div>
</div>
</nav>
<div class="container-xl">
<div class="row">
<div class="col-12 col-md-3 bd-sidebar"><form class="bd-search d-flex align-items-center" action="../../../search.html" method="get">
<i class="icon fas fa-search"></i>
<input type="search" class="form-control" name="q" id="search-input" placeholder="Search the docs ..." aria-label="Search the docs ..." autocomplete="off" >
</form>
<nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
<div class="bd-toc-item active">
<ul class="nav bd-sidenav">
<li class="">
<a href="../../pyspark.sql/index.html">Spark SQL</a>
</li>
<li class="">
<a href="../../pyspark.pandas/index.html">Pandas API on Spark</a>
</li>
<li class="active">
<a href="../index.html">Structured Streaming</a>
<ul>
<li class="">
<a href="../core_classes.html">Core Classes</a>
</li>
<li class="active">
<a href="../io.html">Input/Output</a>
</li>
<li class="">
<a href="../query_management.html">Query Management</a>
</li>
</ul>
</li>
<li class="">
<a href="../../pyspark.ml.html">MLlib (DataFrame-based)</a>
</li>
<li class="">
<a href="../../pyspark.streaming.html">Spark Streaming (Legacy)</a>
</li>
<li class="">
<a href="../../pyspark.mllib.html">MLlib (RDD-based)</a>
</li>
<li class="">
<a href="../../pyspark.html">Spark Core</a>
</li>
<li class="">
<a href="../../pyspark.resource.html">Resource Management</a>
</li>
<li class="">
<a href="../../pyspark.errors.html">Errors</a>
</li>
</ul>
</nav>
</div>
<div class="d-none d-xl-block col-xl-2 bd-toc">
<nav id="bd-toc-nav">
<ul class="nav section-nav flex-column">
</ul>
</nav>
</div>
<main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4 bd-content" role="main">
<div>
<div class="section" id="pyspark-sql-streaming-datastreamwriter-foreachbatch">
<h1>pyspark.sql.streaming.DataStreamWriter.foreachBatch<a class="headerlink" href="#pyspark-sql-streaming-datastreamwriter-foreachbatch" title="Permalink to this headline"></a></h1>
<dl class="py method">
<dt id="pyspark.sql.streaming.DataStreamWriter.foreachBatch">
<code class="sig-prename descclassname">DataStreamWriter.</code><code class="sig-name descname">foreachBatch</code><span class="sig-paren">(</span><em class="sig-param"><span class="n">func</span><span class="p">:</span> <span class="n">Callable[[DataFrame, int], None]</span></em><span class="sig-paren">)</span> &#x2192; DataStreamWriter<a class="reference internal" href="../../../_modules/pyspark/sql/streaming/readwriter.html#DataStreamWriter.foreachBatch"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#pyspark.sql.streaming.DataStreamWriter.foreachBatch" title="Permalink to this definition"></a></dt>
<dd><p>Sets the output of the streaming query to be processed using the provided
function. This is supported only the in the micro-batch execution modes (that is, when the
trigger is not continuous). In every micro-batch, the provided function will be called in
every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier.
The batchId can be used deduplicate and transactionally write the output
(that is, the provided Dataset) to external systems. The output DataFrame is guaranteed
to exactly same for the same batchId (assuming all operations are deterministic in the
query).</p>
<div class="versionadded">
<p><span class="versionmodified added">New in version 2.4.0.</span></p>
</div>
<p class="rubric">Notes</p>
<p>This API is evolving.</p>
<p class="rubric">Examples</p>
<div class="doctest highlight-default notranslate"><div class="highlight"><pre><span></span><span class="gp">&gt;&gt;&gt; </span><span class="kn">import</span> <span class="nn">time</span>
<span class="gp">&gt;&gt;&gt; </span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">readStream</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">&quot;rate&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">load</span><span class="p">()</span>
<span class="gp">&gt;&gt;&gt; </span><span class="k">def</span> <span class="nf">func</span><span class="p">(</span><span class="n">batch_df</span><span class="p">,</span> <span class="n">batch_id</span><span class="p">):</span>
<span class="gp">... </span> <span class="n">batch_df</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span>
<span class="gp">...</span>
<span class="gp">&gt;&gt;&gt; </span><span class="n">q</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">writeStream</span><span class="o">.</span><span class="n">foreachBatch</span><span class="p">(</span><span class="n">func</span><span class="p">)</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="gp">&gt;&gt;&gt; </span><span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">3</span><span class="p">)</span>
<span class="gp">&gt;&gt;&gt; </span><span class="n">q</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
</pre></div>
</div>
</dd></dl>
</div>
</div>
<div class='prev-next-bottom'>
<a class='left-prev' id="prev-link" href="pyspark.sql.streaming.DataStreamWriter.foreach.html" title="previous page">pyspark.sql.streaming.DataStreamWriter.foreach</a>
<a class='right-next' id="next-link" href="pyspark.sql.streaming.DataStreamWriter.format.html" title="next page">pyspark.sql.streaming.DataStreamWriter.format</a>
</div>
</main>
</div>
</div>
<script src="../../../_static/js/index.3da636dd464baa7582d2.js"></script>
<footer class="footer mt-5 mt-md-0">
<div class="container">
<p>
&copy; Copyright .<br/>
Created using <a href="http://sphinx-doc.org/">Sphinx</a> 3.0.4.<br/>
</p>
</div>
</footer>
</body>
</html>