blob: 3ea016d1e776222a93a821c1aa1115e0b2864aba [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.dataframe.io &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.dataframe.io</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.dataframe.io</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="sd">&quot;&quot;&quot;Sources and sinks for the Beam DataFrame API.</span>
<span class="sd">Sources</span>
<span class="sd">#######</span>
<span class="sd">This module provides analogs for pandas ``read`` methods, like</span>
<span class="sd">:func:`pandas.read_csv`. However Beam sources like :func:`read_csv`</span>
<span class="sd">create a Beam :class:`~apache_beam.PTransform`, and return a</span>
<span class="sd">:class:`~apache_beam.dataframe.frames.DeferredDataFrame` or</span>
<span class="sd">:class:`~apache_beam.dataframe.frames.DeferredSeries` representing the contents</span>
<span class="sd">of the referenced file(s) or data source.</span>
<span class="sd">The result of these methods must be applied to a :class:`~apache_beam.Pipeline`</span>
<span class="sd">object, for example::</span>
<span class="sd"> df = p | beam.dataframe.io.read_csv(...)</span>
<span class="sd">Sinks</span>
<span class="sd">#####</span>
<span class="sd">This module also defines analogs for pandas sink, or ``to``, methods that</span>
<span class="sd">generate a Beam :class:`~apache_beam.PTransform`. Users should prefer calling</span>
<span class="sd">these operations from :class:`~apache_beam.dataframe.frames.DeferredDataFrame`</span>
<span class="sd">instances (for example with</span>
<span class="sd">:meth:`DeferredDataFrame.to_csv</span>
<span class="sd">&lt;apache_beam.dataframe.frames.DeferredDataFrame.to_csv&gt;`).</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">import</span> <span class="nn">itertools</span>
<span class="kn">import</span> <span class="nn">math</span>
<span class="kn">import</span> <span class="nn">re</span>
<span class="kn">from</span> <span class="nn">io</span> <span class="kn">import</span> <span class="n">BytesIO</span>
<span class="kn">from</span> <span class="nn">io</span> <span class="kn">import</span> <span class="n">StringIO</span>
<span class="kn">from</span> <span class="nn">io</span> <span class="kn">import</span> <span class="n">TextIOWrapper</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">io</span>
<span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">frame_base</span>
<span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="kn">import</span> <span class="n">fileio</span>
<span class="n">_DEFAULT_LINES_CHUNKSIZE</span> <span class="o">=</span> <span class="mi">10_000</span>
<span class="n">_DEFAULT_BYTES_CHUNKSIZE</span> <span class="o">=</span> <span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">20</span>
<div class="viewcode-block" id="read_gbq"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_gbq">[docs]</a><span class="k">def</span> <span class="nf">read_gbq</span><span class="p">(</span>
<span class="n">table</span><span class="p">,</span> <span class="n">dataset</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">use_bqstorage_api</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;This function reads data from a BigQuery table and produces a</span>
<span class="sd"> :class:`~apache_beam.dataframe.frames.DeferredDataFrame.</span>
<span class="sd"> Args:</span>
<span class="sd"> table (str): Please specify a table. This can be done in the format</span>
<span class="sd"> &#39;PROJECT:dataset.table&#39; if one would not wish to utilize</span>
<span class="sd"> the parameters below.</span>
<span class="sd"> dataset (str): Please specify the dataset</span>
<span class="sd"> (can omit if table was specified as &#39;PROJECT:dataset.table&#39;).</span>
<span class="sd"> project_id (str): Please specify the project ID</span>
<span class="sd"> (can omit if table was specified as &#39;PROJECT:dataset.table&#39;).</span>
<span class="sd"> use_bqstorage_api (bool): If you would like to utilize</span>
<span class="sd"> the BigQuery Storage API in ReadFromBigQuery, please set</span>
<span class="sd"> this flag to true. Otherwise, please set flag</span>
<span class="sd"> to false or leave it unspecified.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">table</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;Please specify a BigQuery table to read from.&quot;</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="n">kwargs</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="sa">f</span><span class="s2">&quot;Encountered unsupported parameter(s) in read_gbq: </span><span class="si">{</span><span class="n">kwargs</span><span class="o">.</span><span class="n">keys</span><span class="p">()</span><span class="si">!r}</span><span class="s2">&quot;</span>
<span class="s2">&quot;&quot;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_ReadGbq</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">dataset</span><span class="p">,</span> <span class="n">project_id</span><span class="p">,</span> <span class="n">use_bqstorage_api</span><span class="p">)</span></div>
<div class="viewcode-block" id="read_csv"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_csv">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">read_csv</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="n">splittable</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;If your files are large and records do not contain quoted newlines, you may</span>
<span class="sd"> pass the extra argument ``splittable=True`` to enable dynamic splitting for</span>
<span class="sd"> this read on newlines. Using this option for records that do contain quoted</span>
<span class="sd"> newlines may result in partial records and data corruption.&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="s1">&#39;nrows&#39;</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;nrows not yet supported&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_ReadFromPandas</span><span class="p">(</span>
<span class="n">pd</span><span class="o">.</span><span class="n">read_csv</span><span class="p">,</span>
<span class="n">path</span><span class="p">,</span>
<span class="n">args</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">,</span>
<span class="n">incremental</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">splitter</span><span class="o">=</span><span class="n">_TextFileSplitter</span><span class="p">(</span><span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">)</span> <span class="k">if</span> <span class="n">splittable</span> <span class="k">else</span> <span class="kc">None</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span> <span class="c1"># avoid circular import</span>
<span class="c1"># TODO(roberwb): Amortize the computation for multiple writes?</span>
<span class="k">return</span> <span class="n">convert</span><span class="o">.</span><span class="n">to_pcollection</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">yield_elements</span><span class="o">=</span><span class="s1">&#39;pandas&#39;</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="n">label</span><span class="p">)</span>
<div class="viewcode-block" id="to_csv"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.to_csv">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">to_csv</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">transform_label</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">label_pc</span> <span class="o">=</span> <span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">transform_label</span><span class="si">}</span><span class="s2"> - ToPCollection&quot;</span> <span class="k">if</span> <span class="n">transform_label</span> \
<span class="k">else</span> <span class="sa">f</span><span class="s2">&quot;ToPCollection(df) - </span><span class="si">{</span><span class="n">path</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="n">label_pd</span> <span class="o">=</span> <span class="sa">f</span><span class="s2">&quot;</span><span class="si">{</span><span class="n">transform_label</span><span class="si">}</span><span class="s2"> - ToPandasDataFrame&quot;</span> <span class="k">if</span> <span class="n">transform_label</span> \
<span class="k">else</span> <span class="sa">f</span><span class="s2">&quot;WriteToPandas(df) - </span><span class="si">{</span><span class="n">path</span><span class="si">}</span><span class="s2">&quot;</span>
<span class="k">return</span> <span class="n">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">label_pc</span><span class="p">)</span> <span class="o">|</span> <span class="n">label_pd</span> <span class="o">&gt;&gt;</span> <span class="n">_WriteToPandas</span><span class="p">(</span>
<span class="s1">&#39;to_csv&#39;</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">incremental</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div>
<div class="viewcode-block" id="read_fwf"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_fwf">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">read_fwf</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">return</span> <span class="n">_ReadFromPandas</span><span class="p">(</span>
<span class="n">pd</span><span class="o">.</span><span class="n">read_fwf</span><span class="p">,</span>
<span class="n">path</span><span class="p">,</span>
<span class="n">args</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">,</span>
<span class="n">incremental</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">splitter</span><span class="o">=</span><span class="n">_TextFileSplitter</span><span class="p">(</span><span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">))</span></div>
<div class="viewcode-block" id="read_json"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_json">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">read_json</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">if</span> <span class="s1">&#39;nrows&#39;</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">&#39;nrows not yet supported&#39;</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;lines&#39;</span><span class="p">,</span> <span class="kc">False</span><span class="p">):</span>
<span class="c1"># Work around https://github.com/pandas-dev/pandas/issues/34548.</span>
<span class="n">kwargs</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">kwargs</span><span class="p">,</span> <span class="n">nrows</span><span class="o">=</span><span class="mi">1</span> <span class="o">&lt;&lt;</span> <span class="mi">63</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_ReadFromPandas</span><span class="p">(</span>
<span class="n">pd</span><span class="o">.</span><span class="n">read_json</span><span class="p">,</span>
<span class="n">path</span><span class="p">,</span>
<span class="n">args</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">,</span>
<span class="n">incremental</span><span class="o">=</span><span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;lines&#39;</span><span class="p">,</span> <span class="kc">False</span><span class="p">),</span>
<span class="n">splitter</span><span class="o">=</span><span class="n">_DelimSplitter</span><span class="p">(</span><span class="sa">b</span><span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">)</span> <span class="k">if</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="s1">&#39;lines&#39;</span><span class="p">,</span> <span class="kc">False</span><span class="p">)</span> <span class="k">else</span> <span class="kc">None</span><span class="p">,</span>
<span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div>
<div class="viewcode-block" id="to_json"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.to_json">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">to_json</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">orient</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">if</span> <span class="n">orient</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="o">.</span><span class="n">proxy</span><span class="p">(),</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">):</span>
<span class="n">orient</span> <span class="o">=</span> <span class="s1">&#39;columns&#39;</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="o">.</span><span class="n">proxy</span><span class="p">(),</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">):</span>
<span class="n">orient</span> <span class="o">=</span> <span class="s1">&#39;index&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">WontImplementError</span><span class="p">(</span><span class="s1">&#39;not dataframes or series&#39;</span><span class="p">)</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;orient&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">orient</span>
<span class="k">return</span> <span class="n">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">)</span> <span class="o">|</span> <span class="n">_WriteToPandas</span><span class="p">(</span>
<span class="s1">&#39;to_json&#39;</span><span class="p">,</span>
<span class="n">path</span><span class="p">,</span>
<span class="n">args</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">,</span>
<span class="n">incremental</span><span class="o">=</span><span class="n">orient</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;index&#39;</span><span class="p">,</span> <span class="s1">&#39;records&#39;</span><span class="p">,</span> <span class="s1">&#39;values&#39;</span><span class="p">),</span>
<span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div>
<div class="viewcode-block" id="read_html"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_html">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">read_html</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">return</span> <span class="n">_ReadFromPandas</span><span class="p">(</span>
<span class="k">lambda</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">read_html</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)[</span><span class="mi">0</span><span class="p">],</span>
<span class="n">path</span><span class="p">,</span>
<span class="n">args</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">)</span></div>
<div class="viewcode-block" id="to_html"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.to_html">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">to_html</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">return</span> <span class="n">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">)</span> <span class="o">|</span> <span class="n">_WriteToPandas</span><span class="p">(</span>
<span class="s1">&#39;to_html&#39;</span><span class="p">,</span>
<span class="n">path</span><span class="p">,</span>
<span class="n">args</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">,</span>
<span class="n">incremental</span><span class="o">=</span><span class="p">(</span>
<span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="o">.</span><span class="n">proxy</span><span class="p">()</span><span class="o">.</span><span class="n">index</span><span class="o">.</span><span class="n">nlevels</span> <span class="o">==</span> <span class="mi">1</span> <span class="ow">or</span>
<span class="ow">not</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;sparsify&#39;</span><span class="p">,</span> <span class="kc">True</span><span class="p">)),</span>
<span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_binary_reader</span><span class="p">(</span><span class="nb">format</span><span class="p">):</span>
<span class="n">func</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">pd</span><span class="p">,</span> <span class="s1">&#39;read_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">format</span><span class="p">)</span>
<span class="n">result</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">_ReadFromPandas</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">)</span>
<span class="n">result</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="sa">f</span><span class="s1">&#39;read_</span><span class="si">{</span><span class="nb">format</span><span class="si">}</span><span class="s1">&#39;</span>
<span class="k">return</span> <span class="n">result</span>
<span class="k">def</span> <span class="nf">_binary_writer</span><span class="p">(</span><span class="nb">format</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="p">(</span>
<span class="k">lambda</span> <span class="n">df</span><span class="p">,</span>
<span class="n">path</span><span class="p">,</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">)</span> <span class="o">|</span> <span class="n">_WriteToPandas</span><span class="p">(</span><span class="sa">f</span><span class="s1">&#39;to_</span><span class="si">{</span><span class="nb">format</span><span class="si">}</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">))</span>
<span class="n">result</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="sa">f</span><span class="s1">&#39;to_</span><span class="si">{</span><span class="nb">format</span><span class="si">}</span><span class="s1">&#39;</span>
<span class="k">return</span> <span class="n">result</span>
<span class="k">for</span> <span class="nb">format</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;excel&#39;</span><span class="p">,</span> <span class="s1">&#39;feather&#39;</span><span class="p">,</span> <span class="s1">&#39;parquet&#39;</span><span class="p">,</span> <span class="s1">&#39;stata&#39;</span><span class="p">):</span>
<span class="nb">globals</span><span class="p">()[</span><span class="s1">&#39;read_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">format</span><span class="p">]</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)(</span>
<span class="n">_binary_reader</span><span class="p">(</span><span class="nb">format</span><span class="p">))</span>
<span class="nb">globals</span><span class="p">()[</span><span class="s1">&#39;to_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">format</span><span class="p">]</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)(</span>
<span class="n">_binary_writer</span><span class="p">(</span><span class="nb">format</span><span class="p">))</span>
<span class="k">for</span> <span class="nb">format</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;sas&#39;</span><span class="p">,</span> <span class="s1">&#39;spss&#39;</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">pd</span><span class="p">,</span> <span class="s1">&#39;read_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">format</span><span class="p">):</span> <span class="c1"># Depends on pandas version.</span>
<span class="nb">globals</span><span class="p">()[</span><span class="s1">&#39;read_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">format</span><span class="p">]</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)(</span>
<span class="n">_binary_reader</span><span class="p">(</span><span class="nb">format</span><span class="p">))</span>
<span class="n">read_clipboard</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">not_implemented_method</span><span class="p">(</span>
<span class="s1">&#39;read_clipboard&#39;</span><span class="p">,</span> <span class="n">base_type</span><span class="o">=</span><span class="n">pd</span><span class="p">)</span>
<span class="n">to_clipboard</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">not_implemented_method</span><span class="p">(</span>
<span class="s1">&#39;to_clipboard&#39;</span><span class="p">,</span> <span class="n">base_type</span><span class="o">=</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span>
<span class="n">read_msgpack</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">wont_implement_method</span><span class="p">(</span>
<span class="n">pd</span><span class="p">,</span> <span class="s1">&#39;read_msgpack&#39;</span><span class="p">,</span> <span class="n">reason</span><span class="o">=</span><span class="s2">&quot;deprecated&quot;</span><span class="p">)</span>
<span class="n">to_msgpack</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">wont_implement_method</span><span class="p">(</span>
<span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">,</span> <span class="s1">&#39;to_msgpack&#39;</span><span class="p">,</span> <span class="n">reason</span><span class="o">=</span><span class="s2">&quot;deprecated&quot;</span><span class="p">)</span>
<span class="n">read_hdf</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">wont_implement_method</span><span class="p">(</span>
<span class="n">pd</span><span class="p">,</span> <span class="s1">&#39;read_hdf&#39;</span><span class="p">,</span> <span class="n">explanation</span><span class="o">=</span><span class="s2">&quot;because HDF5 is a random access file format&quot;</span><span class="p">)</span>
<span class="n">to_hdf</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">wont_implement_method</span><span class="p">(</span>
<span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">,</span>
<span class="s1">&#39;to_hdf&#39;</span><span class="p">,</span>
<span class="n">explanation</span><span class="o">=</span><span class="s2">&quot;because HDF5 is a random access file format&quot;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="nb">dir</span><span class="p">(</span><span class="n">pd</span><span class="p">):</span>
<span class="k">if</span> <span class="n">name</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;read_&#39;</span><span class="p">)</span> <span class="ow">and</span> <span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="nb">globals</span><span class="p">():</span>
<span class="nb">globals</span><span class="p">()[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">not_implemented_method</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">base_type</span><span class="o">=</span><span class="n">pd</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_shift_range_index</span><span class="p">(</span><span class="n">offset</span><span class="p">,</span> <span class="n">df</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">index</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">RangeIndex</span><span class="p">):</span>
<span class="k">return</span> <span class="n">df</span><span class="o">.</span><span class="n">set_index</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">index</span> <span class="o">+</span> <span class="n">offset</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">df</span>
<span class="k">class</span> <span class="nc">_ReadFromPandas</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">reader</span><span class="p">,</span>
<span class="n">path</span><span class="p">,</span>
<span class="n">args</span><span class="p">,</span>
<span class="n">kwargs</span><span class="p">,</span>
<span class="n">binary</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">incremental</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">splitter</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="k">if</span> <span class="s1">&#39;compression&#39;</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">&#39;compression&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">WontImplementError</span><span class="p">(</span><span class="s1">&#39;non-deferred&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">reader</span> <span class="o">=</span> <span class="n">reader</span>
<span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="n">path</span>
<span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">binary</span> <span class="o">=</span> <span class="n">binary</span>
<span class="bp">self</span><span class="o">.</span><span class="n">incremental</span> <span class="o">=</span> <span class="n">incremental</span>
<span class="bp">self</span><span class="o">.</span><span class="n">splitter</span> <span class="o">=</span> <span class="n">splitter</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">root</span><span class="p">):</span>
<span class="n">paths_pcoll</span> <span class="o">=</span> <span class="n">root</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">])</span>
<span class="n">match</span> <span class="o">=</span> <span class="n">io</span><span class="o">.</span><span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">match</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">],</span> <span class="n">limits</span><span class="o">=</span><span class="p">[</span><span class="mi">1</span><span class="p">])[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">match</span><span class="o">.</span><span class="n">metadata_list</span><span class="p">:</span>
<span class="c1"># TODO(https://github.com/apache/beam/issues/20858): This should be</span>
<span class="c1"># allowed for streaming pipelines if user provides an explicit schema.</span>
<span class="k">raise</span> <span class="ne">FileNotFoundError</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Found no files that match </span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="si">!r}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="n">first_path</span> <span class="o">=</span> <span class="n">match</span><span class="o">.</span><span class="n">metadata_list</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">path</span>
<span class="k">with</span> <span class="n">io</span><span class="o">.</span><span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">first_path</span><span class="p">)</span> <span class="k">as</span> <span class="n">handle</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">:</span>
<span class="n">handle</span> <span class="o">=</span> <span class="n">TextIOWrapper</span><span class="p">(</span><span class="n">handle</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">:</span>
<span class="n">sample</span> <span class="o">=</span> <span class="nb">next</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">reader</span><span class="p">(</span><span class="n">handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="nb">dict</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">,</span> <span class="n">chunksize</span><span class="o">=</span><span class="mi">100</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">sample</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">reader</span><span class="p">(</span><span class="n">handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="n">matches_pcoll</span> <span class="o">=</span> <span class="n">paths_pcoll</span> <span class="o">|</span> <span class="n">fileio</span><span class="o">.</span><span class="n">MatchAll</span><span class="p">()</span>
<span class="n">indices_pcoll</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">matches_pcoll</span><span class="o">.</span><span class="n">pipeline</span>
<span class="o">|</span> <span class="s1">&#39;DoOnce&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
<span class="k">lambda</span> <span class="n">_</span><span class="p">,</span>
<span class="n">paths</span><span class="p">:</span> <span class="p">{</span><span class="n">path</span><span class="p">:</span> <span class="n">ix</span>
<span class="k">for</span> <span class="n">ix</span><span class="p">,</span> <span class="n">path</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="nb">sorted</span><span class="p">(</span><span class="n">paths</span><span class="p">))},</span>
<span class="n">paths</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsList</span><span class="p">(</span>
<span class="n">matches_pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">match</span><span class="p">:</span> <span class="n">match</span><span class="o">.</span><span class="n">path</span><span class="p">))))</span>
<span class="n">pcoll</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">matches_pcoll</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Reshuffle</span><span class="p">()</span>
<span class="o">|</span> <span class="n">fileio</span><span class="o">.</span><span class="n">ReadMatches</span><span class="p">()</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_ReadFromPandasDoFn</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">reader</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">splitter</span><span class="p">),</span>
<span class="n">path_indices</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span><span class="n">indices_pcoll</span><span class="p">)))</span>
<span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span>
<span class="k">return</span> <span class="n">convert</span><span class="o">.</span><span class="n">to_dataframe</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">proxy</span><span class="o">=</span><span class="n">sample</span><span class="p">[:</span><span class="mi">0</span><span class="p">])</span>
<span class="k">class</span> <span class="nc">_Splitter</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">empty_buffer</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns an empty buffer of the right type (string or bytes).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">read_header</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Reads the header from handle, which points to the start of the file.</span>
<span class="sd"> Returns the pair (header, buffer) where buffer contains any part of the</span>
<span class="sd"> file that was &quot;overread&quot; from handle while seeking the end of header.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">read_to_record_boundary</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffered</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Reads the given handle up to the end of the current record.</span>
<span class="sd"> The buffer argument represents bytes that were read previously; logically</span>
<span class="sd"> it&#39;s as if these were pushed back into handle for reading. If the</span>
<span class="sd"> record end is within buffered, it&#39;s possible that no more bytes will be read</span>
<span class="sd"> from handle at all.</span>
<span class="sd"> Returns the pair (remaining_record_bytes, buffer) where buffer contains</span>
<span class="sd"> any part of the file that was &quot;overread&quot; from handle while seeking the end</span>
<span class="sd"> of the record.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_DelimSplitter</span><span class="p">(</span><span class="n">_Splitter</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A _Splitter that splits on delimiters between records.</span>
<span class="sd"> This delimiter is assumed ot never occur within a record.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">delim</span><span class="p">,</span> <span class="n">read_chunk_size</span><span class="o">=</span><span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">):</span>
<span class="c1"># Multi-char delimiters would require more care across chunk boundaries.</span>
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="n">delim</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_delim</span> <span class="o">=</span> <span class="n">delim</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> <span class="o">=</span> <span class="n">delim</span><span class="p">[:</span><span class="mi">0</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_read_chunk_size</span> <span class="o">=</span> <span class="n">read_chunk_size</span>
<span class="k">def</span> <span class="nf">empty_buffer</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span>
<span class="k">def</span> <span class="nf">read_header</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span>
<span class="k">def</span> <span class="nf">read_to_record_boundary</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffered</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_delim</span> <span class="ow">in</span> <span class="n">buffered</span><span class="p">:</span>
<span class="n">ix</span> <span class="o">=</span> <span class="n">buffered</span><span class="o">.</span><span class="n">index</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_delim</span><span class="p">)</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_delim</span><span class="p">)</span>
<span class="k">return</span> <span class="n">buffered</span><span class="p">[:</span><span class="n">ix</span><span class="p">],</span> <span class="n">buffered</span><span class="p">[</span><span class="n">ix</span><span class="p">:]</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">chunk</span> <span class="o">=</span> <span class="n">handle</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_chunk_size</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_delim</span> <span class="ow">in</span> <span class="n">chunk</span><span class="p">:</span>
<span class="n">ix</span> <span class="o">=</span> <span class="n">chunk</span><span class="o">.</span><span class="n">index</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_delim</span><span class="p">)</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_delim</span><span class="p">)</span>
<span class="k">return</span> <span class="n">buffered</span> <span class="o">+</span> <span class="n">chunk</span><span class="p">[:</span><span class="n">ix</span><span class="p">],</span> <span class="n">chunk</span><span class="p">[</span><span class="n">ix</span><span class="p">:]</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="n">chunk</span><span class="p">:</span>
<span class="k">return</span> <span class="n">buffered</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">buffered</span> <span class="o">+=</span> <span class="n">chunk</span>
<span class="k">def</span> <span class="nf">_maybe_encode</span><span class="p">(</span><span class="n">str_or_bytes</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">str_or_bytes</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="k">return</span> <span class="n">str_or_bytes</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">str_or_bytes</span>
<span class="k">class</span> <span class="nc">_TextFileSplitter</span><span class="p">(</span><span class="n">_DelimSplitter</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Splitter for dynamically sharding CSV files and newline record boundaries.</span>
<span class="sd"> Currently does not handle quoted newlines, so is off by default, but such</span>
<span class="sd"> support could be added in the future.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">read_chunk_size</span><span class="o">=</span><span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">):</span>
<span class="k">if</span> <span class="n">args</span><span class="p">:</span>
<span class="c1"># TODO(robertwb): Automatically populate kwargs as we do for df methods.</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;Non-path arguments must be passed by keyword &#39;</span>
<span class="s1">&#39;for splittable csv reads.&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;skipfooter&#39;</span><span class="p">,</span> <span class="mi">0</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Splittablility incompatible with skipping footers.&#39;</span><span class="p">)</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="n">_maybe_encode</span><span class="p">(</span><span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;lineterminator&#39;</span><span class="p">,</span> <span class="sa">b</span><span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="p">)),</span>
<span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="k">def</span> <span class="nf">read_header</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;header&#39;</span><span class="p">,</span> <span class="s1">&#39;infer&#39;</span><span class="p">)</span> <span class="o">==</span> <span class="s1">&#39;infer&#39;</span><span class="p">:</span>
<span class="k">if</span> <span class="s1">&#39;names&#39;</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">:</span>
<span class="n">header</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">header</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">header</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">[</span><span class="s1">&#39;header&#39;</span><span class="p">]</span>
<span class="k">if</span> <span class="n">header</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">header</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span>
<span class="n">max_header</span> <span class="o">=</span> <span class="n">header</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">max_header</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="n">header</span><span class="p">)</span>
<span class="n">skiprows</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;skiprows&#39;</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">skiprows</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span>
<span class="n">is_skiprow</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">ix</span><span class="p">:</span> <span class="n">ix</span> <span class="o">&lt;</span> <span class="n">skiprows</span>
<span class="k">elif</span> <span class="nb">callable</span><span class="p">(</span><span class="n">skiprows</span><span class="p">):</span>
<span class="n">is_skiprow</span> <span class="o">=</span> <span class="n">skiprows</span>
<span class="k">elif</span> <span class="n">skiprows</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">is_skiprow</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">ix</span><span class="p">:</span> <span class="kc">False</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">is_skiprow</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">ix</span><span class="p">:</span> <span class="n">ix</span> <span class="ow">in</span> <span class="n">skiprows</span>
<span class="n">comment</span> <span class="o">=</span> <span class="n">_maybe_encode</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;comment&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">))</span>
<span class="k">if</span> <span class="n">comment</span><span class="p">:</span>
<span class="n">is_comment</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="n">line</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="n">comment</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">is_comment</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="kc">False</span>
<span class="n">skip_blank_lines</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;skip_blank_lines&#39;</span><span class="p">,</span> <span class="kc">True</span><span class="p">)</span>
<span class="k">if</span> <span class="n">skip_blank_lines</span><span class="p">:</span>
<span class="n">is_blank</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="n">re</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="sa">rb</span><span class="s1">&#39;^\s*$&#39;</span><span class="p">,</span> <span class="n">line</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">is_blank</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="kc">False</span>
<span class="n">text_header</span> <span class="o">=</span> <span class="sa">b</span><span class="s1">&#39;&#39;</span>
<span class="n">rest</span> <span class="o">=</span> <span class="sa">b</span><span class="s1">&#39;&#39;</span>
<span class="n">skipped</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">for</span> <span class="n">ix</span> <span class="ow">in</span> <span class="n">itertools</span><span class="o">.</span><span class="n">count</span><span class="p">():</span>
<span class="n">line</span><span class="p">,</span> <span class="n">rest</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">read_to_record_boundary</span><span class="p">(</span><span class="n">rest</span><span class="p">,</span> <span class="n">handle</span><span class="p">)</span>
<span class="n">text_header</span> <span class="o">+=</span> <span class="n">line</span>
<span class="k">if</span> <span class="n">is_skiprow</span><span class="p">(</span><span class="n">ix</span><span class="p">)</span> <span class="ow">or</span> <span class="n">is_blank</span><span class="p">(</span><span class="n">line</span><span class="p">)</span> <span class="ow">or</span> <span class="n">is_comment</span><span class="p">(</span><span class="n">line</span><span class="p">):</span>
<span class="n">skipped</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">continue</span>
<span class="k">if</span> <span class="n">ix</span> <span class="o">-</span> <span class="n">skipped</span> <span class="o">==</span> <span class="n">max_header</span><span class="p">:</span>
<span class="k">return</span> <span class="n">text_header</span><span class="p">,</span> <span class="n">rest</span>
<span class="k">class</span> <span class="nc">_TruncatingFileHandle</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A wrapper of a file-like object representing the restriction of the</span>
<span class="sd"> underling handle according to the given SDF restriction tracker, breaking</span>
<span class="sd"> the file only after the given delimiter.</span>
<span class="sd"> For example, if the underling restriction is [103, 607) and each line were</span>
<span class="sd"> exactly 10 characters long (i.e. every 10th charcter was a newline), then this</span>
<span class="sd"> would give a view of a 500-byte file consisting of bytes bytes 110 to 609</span>
<span class="sd"> (inclusive) of the underlying file.</span>
<span class="sd"> As with all SDF trackers, the endpoint may change dynamically during reading.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">underlying</span><span class="p">,</span> <span class="n">tracker</span><span class="p">,</span> <span class="n">splitter</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span> <span class="o">=</span> <span class="n">underlying</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_tracker</span> <span class="o">=</span> <span class="n">tracker</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span> <span class="o">=</span> <span class="n">splitter</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">empty_buffer</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_done</span> <span class="o">=</span> <span class="kc">False</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">read_header</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">start</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span><span class="o">.</span><span class="n">start</span>
<span class="c1"># Seek to first delimiter after the start position.</span>
<span class="k">if</span> <span class="n">start</span> <span class="o">&gt;</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">):</span>
<span class="k">if</span> <span class="n">start</span> <span class="o">&gt;</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">)</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">=</span> <span class="n">start</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="o">.</span><span class="n">seek</span><span class="p">(</span><span class="n">start</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">=</span> <span class="n">start</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">[</span><span class="n">start</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">):]</span>
<span class="n">skip</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">read_to_record_boundary</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">+=</span> <span class="nb">len</span><span class="p">(</span><span class="n">skip</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">readable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">def</span> <span class="nf">writable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">def</span> <span class="nf">seekable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">closed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">def</span> <span class="fm">__iter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># For pandas is_file_like.</span>
<span class="k">return</span> <span class="bp">self</span>
<span class="k">def</span> <span class="fm">__next__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_line_iterator</span><span class="p">()</span>
<span class="k">return</span> <span class="nb">next</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">readline</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># This attribute is checked, but unused, by pandas.</span>
<span class="k">return</span> <span class="nb">next</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_line_iterator</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">line_start</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">chunk</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read</span><span class="p">()</span>
<span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">line_end</span> <span class="o">=</span> <span class="n">chunk</span><span class="o">.</span><span class="n">find</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">_delim</span><span class="p">,</span> <span class="n">line_start</span><span class="p">)</span>
<span class="k">while</span> <span class="n">line_end</span> <span class="o">==</span> <span class="o">-</span><span class="mi">1</span><span class="p">:</span>
<span class="n">more</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">more</span><span class="p">:</span>
<span class="k">if</span> <span class="n">line_start</span> <span class="o">&lt;</span> <span class="nb">len</span><span class="p">(</span><span class="n">chunk</span><span class="p">):</span>
<span class="k">yield</span> <span class="n">chunk</span><span class="p">[</span><span class="n">line_start</span><span class="p">:]</span>
<span class="k">return</span>
<span class="n">chunk</span> <span class="o">=</span> <span class="n">chunk</span><span class="p">[</span><span class="n">line_start</span><span class="p">:]</span> <span class="o">+</span> <span class="n">more</span>
<span class="n">line_start</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">line_end</span> <span class="o">=</span> <span class="n">chunk</span><span class="o">.</span><span class="n">find</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">_delim</span><span class="p">,</span> <span class="n">line_start</span><span class="p">)</span>
<span class="k">yield</span> <span class="n">chunk</span><span class="p">[</span><span class="n">line_start</span><span class="p">:</span><span class="n">line_end</span> <span class="o">+</span> <span class="mi">1</span><span class="p">]</span>
<span class="n">line_start</span> <span class="o">=</span> <span class="n">line_end</span> <span class="o">+</span> <span class="mi">1</span>
<span class="k">def</span> <span class="nf">read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">size</span><span class="o">=-</span><span class="mi">1</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">&#39;Cannot call read after iterating.&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read</span><span class="p">(</span><span class="n">size</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">size</span><span class="o">=-</span><span class="mi">1</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">:</span>
<span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_header</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_header</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">res</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_done</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span>
<span class="k">elif</span> <span class="n">size</span> <span class="o">==</span> <span class="o">-</span><span class="mi">1</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">+=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="n">size</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_done</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tracker</span><span class="o">.</span><span class="n">try_claim</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">)):</span>
<span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">+=</span> <span class="nb">len</span><span class="p">(</span><span class="n">res</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">offset</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span><span class="o">.</span><span class="n">stop</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span>
<span class="k">if</span> <span class="n">offset</span> <span class="o">&lt;=</span> <span class="mi">0</span><span class="p">:</span>
<span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">rest</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">read_to_record_boundary</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">[</span><span class="n">offset</span><span class="p">:],</span> <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="p">)</span>
<span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">[:</span><span class="n">offset</span><span class="p">]</span> <span class="o">+</span> <span class="n">rest</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_done</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">return</span> <span class="n">res</span>
<span class="k">class</span> <span class="nc">_ReadFromPandasDoFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">RestrictionProvider</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">reader</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">binary</span><span class="p">,</span> <span class="n">incremental</span><span class="p">,</span> <span class="n">splitter</span><span class="p">):</span>
<span class="c1"># avoid pickling issues</span>
<span class="k">if</span> <span class="n">reader</span><span class="o">.</span><span class="vm">__module__</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;pandas.&#39;</span><span class="p">):</span>
<span class="n">reader</span> <span class="o">=</span> <span class="n">reader</span><span class="o">.</span><span class="vm">__name__</span>
<span class="bp">self</span><span class="o">.</span><span class="n">reader</span> <span class="o">=</span> <span class="n">reader</span>
<span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">binary</span> <span class="o">=</span> <span class="n">binary</span>
<span class="bp">self</span><span class="o">.</span><span class="n">incremental</span> <span class="o">=</span> <span class="n">incremental</span>
<span class="bp">self</span><span class="o">.</span><span class="n">splitter</span> <span class="o">=</span> <span class="n">splitter</span>
<span class="k">def</span> <span class="nf">initial_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">readable_file</span><span class="p">):</span>
<span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">restriction_trackers</span><span class="o">.</span><span class="n">OffsetRange</span><span class="p">(</span>
<span class="mi">0</span><span class="p">,</span> <span class="n">readable_file</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">size_in_bytes</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">restriction_size</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">readable_file</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span>
<span class="k">return</span> <span class="n">restriction</span><span class="o">.</span><span class="n">size</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">create_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span>
<span class="n">tracker</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">restriction_trackers</span><span class="o">.</span><span class="n">OffsetRestrictionTracker</span><span class="p">(</span><span class="n">restriction</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">splitter</span><span class="p">:</span>
<span class="k">return</span> <span class="n">tracker</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">restriction_trackers</span><span class="o">.</span><span class="n">UnsplittableRestrictionTracker</span><span class="p">(</span>
<span class="n">tracker</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">readable_file</span><span class="p">,</span> <span class="n">path_indices</span><span class="p">,</span> <span class="n">tracker</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">RestrictionParam</span><span class="p">()):</span>
<span class="n">reader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">reader</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">reader</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">reader</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">pd</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">reader</span><span class="p">)</span>
<span class="n">indices_per_file</span> <span class="o">=</span> <span class="mi">10</span><span class="o">**</span><span class="nb">int</span><span class="p">(</span><span class="n">math</span><span class="o">.</span><span class="n">log</span><span class="p">(</span><span class="mi">2</span><span class="o">**</span><span class="mi">63</span> <span class="o">//</span> <span class="nb">len</span><span class="p">(</span><span class="n">path_indices</span><span class="p">),</span> <span class="mi">10</span><span class="p">))</span>
<span class="k">if</span> <span class="n">readable_file</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">size_in_bytes</span> <span class="o">&gt;</span> <span class="n">indices_per_file</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span>
<span class="sa">f</span><span class="s1">&#39;Cannot safely index records from </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">path_indices</span><span class="p">)</span><span class="si">}</span><span class="s1"> files &#39;</span>
<span class="sa">f</span><span class="s1">&#39;of size </span><span class="si">{</span><span class="n">readable_file</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">size_in_bytes</span><span class="si">}</span><span class="s1"> &#39;</span>
<span class="sa">f</span><span class="s1">&#39;as their product is greater than 2^63.&#39;</span><span class="p">)</span>
<span class="n">start_index</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span><span class="o">.</span><span class="n">start</span> <span class="o">+</span>
<span class="n">path_indices</span><span class="p">[</span><span class="n">readable_file</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="p">]</span> <span class="o">*</span> <span class="n">indices_per_file</span><span class="p">)</span>
<span class="k">with</span> <span class="n">readable_file</span><span class="o">.</span><span class="n">open</span><span class="p">()</span> <span class="k">as</span> <span class="n">handle</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">:</span>
<span class="c1"># TODO(robertwb): We could consider trying to get progress for</span>
<span class="c1"># non-incremental sources that are read linearly, as long as they</span>
<span class="c1"># don&#39;t try to seek. This could be deceptive as progress would</span>
<span class="c1"># advance to 100% the instant the (large) read was done, discounting</span>
<span class="c1"># any downstream processing.</span>
<span class="n">handle</span> <span class="o">=</span> <span class="n">_TruncatingFileHandle</span><span class="p">(</span>
<span class="n">handle</span><span class="p">,</span>
<span class="n">tracker</span><span class="p">,</span>
<span class="n">splitter</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">splitter</span> <span class="ow">or</span>
<span class="n">_DelimSplitter</span><span class="p">(</span><span class="sa">b</span><span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">))</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">:</span>
<span class="n">handle</span> <span class="o">=</span> <span class="n">TextIOWrapper</span><span class="p">(</span><span class="n">handle</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">:</span>
<span class="k">if</span> <span class="s1">&#39;chunksize&#39;</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;chunksize&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">_DEFAULT_LINES_CHUNKSIZE</span>
<span class="n">frames</span> <span class="o">=</span> <span class="n">reader</span><span class="p">(</span><span class="n">handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">frames</span> <span class="o">=</span> <span class="p">[</span><span class="n">reader</span><span class="p">(</span><span class="n">handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)]</span>
<span class="k">for</span> <span class="n">df</span> <span class="ow">in</span> <span class="n">frames</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">_shift_range_index</span><span class="p">(</span><span class="n">start_index</span><span class="p">,</span> <span class="n">df</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">:</span>
<span class="c1"># Satisfy the SDF contract by claiming the whole range.</span>
<span class="c1"># Do this after emitting the frames to avoid advancing progress to 100%</span>
<span class="c1"># prior to that.</span>
<span class="n">tracker</span><span class="o">.</span><span class="n">try_claim</span><span class="p">(</span><span class="n">tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span><span class="o">.</span><span class="n">stop</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_WriteToPandas</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">writer</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">incremental</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">binary</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="n">writer</span>
<span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="n">path</span>
<span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">incremental</span> <span class="o">=</span> <span class="n">incremental</span>
<span class="bp">self</span><span class="o">.</span><span class="n">binary</span> <span class="o">=</span> <span class="n">binary</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="k">if</span> <span class="s1">&#39;file_naming&#39;</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">:</span>
<span class="nb">dir</span><span class="p">,</span> <span class="n">name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="s1">&#39;&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="nb">dir</span><span class="p">,</span> <span class="n">name</span> <span class="o">=</span> <span class="n">io</span><span class="o">.</span><span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">)</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">fileio</span><span class="o">.</span><span class="n">WriteToFiles</span><span class="p">(</span>
<span class="n">path</span><span class="o">=</span><span class="nb">dir</span><span class="p">,</span>
<span class="n">shards</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">&#39;num_shards&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">file_naming</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span>
<span class="s1">&#39;file_naming&#39;</span><span class="p">,</span> <span class="n">fileio</span><span class="o">.</span><span class="n">default_file_naming</span><span class="p">(</span><span class="n">name</span><span class="p">)),</span>
<span class="n">sink</span><span class="o">=</span><span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_WriteToPandasFileSink</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">))</span>
<span class="k">class</span> <span class="nc">_WriteToPandasFileSink</span><span class="p">(</span><span class="n">fileio</span><span class="o">.</span><span class="n">FileSink</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">writer</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">incremental</span><span class="p">,</span> <span class="n">binary</span><span class="p">):</span>
<span class="k">if</span> <span class="s1">&#39;compression&#39;</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">&#39;compression&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="n">writer</span>
<span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">incremental</span> <span class="o">=</span> <span class="n">incremental</span>
<span class="bp">self</span><span class="o">.</span><span class="n">binary</span> <span class="o">=</span> <span class="n">binary</span>
<span class="bp">self</span><span class="o">.</span><span class="n">StringOrBytesIO</span> <span class="o">=</span> <span class="n">BytesIO</span> <span class="k">if</span> <span class="n">binary</span> <span class="k">else</span> <span class="n">StringIO</span>
<span class="k">if</span> <span class="n">incremental</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_record_incremental</span>
<span class="bp">self</span><span class="o">.</span><span class="n">flush</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">close_incremental</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer_record</span>
<span class="bp">self</span><span class="o">.</span><span class="n">flush</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">flush_buffer</span>
<span class="k">def</span> <span class="nf">open</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_handle</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">empty</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">header</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">footer</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">:</span>
<span class="n">file_handle</span> <span class="o">=</span> <span class="n">TextIOWrapper</span><span class="p">(</span><span class="n">file_handle</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span> <span class="o">=</span> <span class="n">file_handle</span>
<span class="k">def</span> <span class="nf">write_to</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">df</span><span class="p">,</span> <span class="n">file_handle</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">non_none_handle</span> <span class="o">=</span> <span class="n">file_handle</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">StringOrBytesIO</span><span class="p">()</span>
<span class="nb">getattr</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="p">)(</span><span class="n">non_none_handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">if</span> <span class="n">file_handle</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">non_none_handle</span><span class="o">.</span><span class="n">getvalue</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">write_record_incremental</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">empty</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">empty</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">value</span><span class="p">[:</span><span class="mi">0</span><span class="p">])</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">header</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">value</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">new_value</span><span class="p">(</span><span class="n">ix</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">ix</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span><span class="n">new_value</span><span class="p">(</span><span class="n">ix</span><span class="p">[</span><span class="mi">0</span><span class="p">]),</span> <span class="p">)</span> <span class="o">+</span> <span class="n">ix</span><span class="p">[</span><span class="mi">1</span><span class="p">:]</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="s1">&#39;x&#39;</span><span class="p">)</span> <span class="o">+</span> <span class="s1">&#39;_again&#39;</span>
<span class="k">def</span> <span class="nf">change_index</span><span class="p">(</span><span class="n">df</span><span class="p">):</span>
<span class="n">df</span><span class="o">.</span><span class="n">index</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">index</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">new_value</span><span class="p">)</span>
<span class="k">return</span> <span class="n">df</span>
<span class="n">one_row</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">value</span><span class="p">[:</span><span class="mi">1</span><span class="p">])</span>
<span class="n">another_row</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">change_index</span><span class="p">(</span><span class="n">value</span><span class="p">[:</span><span class="mi">1</span><span class="p">]))</span>
<span class="n">two_rows</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">concat</span><span class="p">([</span><span class="n">value</span><span class="p">[:</span><span class="mi">1</span><span class="p">],</span> <span class="n">change_index</span><span class="p">(</span><span class="n">value</span><span class="p">[:</span><span class="mi">1</span><span class="p">])]))</span>
<span class="k">for</span> <span class="n">ix</span><span class="p">,</span> <span class="n">c</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">):</span>
<span class="k">if</span> <span class="n">one_row</span><span class="p">[</span><span class="n">ix</span><span class="p">]</span> <span class="o">!=</span> <span class="n">c</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">ix</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">header</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">[:</span><span class="n">ix</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">footer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">[</span><span class="n">ix</span><span class="p">:]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">delimiter</span> <span class="o">=</span> <span class="n">two_rows</span><span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="n">one_row</span><span class="p">)</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">footer</span><span class="p">):</span><span class="o">-</span><span class="p">(</span>
<span class="nb">len</span><span class="p">(</span><span class="n">another_row</span><span class="p">)</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">header</span><span class="p">))</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">header</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">first</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">value</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">first</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">first</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">delimiter</span><span class="p">)</span>
<span class="c1"># IDEA(robertwb): Construct a &quot;truncating&quot; stream wrapper to avoid the</span>
<span class="c1"># in-memory copy.</span>
<span class="n">rows</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">value</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">rows</span><span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">header</span><span class="p">):</span><span class="o">-</span><span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">footer</span><span class="p">)</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">])</span>
<span class="k">def</span> <span class="nf">close_incremental</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">footer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">footer</span><span class="p">)</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">empty</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">buffer_record</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">value</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">flush_buffer</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">concat</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<div class="viewcode-block" id="ReadViaPandas"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.ReadViaPandas">[docs]</a><span class="k">class</span> <span class="nc">ReadViaPandas</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="nb">format</span><span class="p">,</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span>
<span class="n">include_indexes</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">objects_as_strings</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_reader</span> <span class="o">=</span> <span class="nb">globals</span><span class="p">()[</span><span class="s1">&#39;read_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">format</span><span class="p">](</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_include_indexes</span> <span class="o">=</span> <span class="n">include_indexes</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_objects_as_strings</span> <span class="o">=</span> <span class="n">objects_as_strings</span>
<div class="viewcode-block" id="ReadViaPandas.expand"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.ReadViaPandas.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">p</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span> <span class="c1"># avoid circular import</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">_reader</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_objects_as_strings</span><span class="p">:</span>
<span class="k">for</span> <span class="n">col</span><span class="p">,</span> <span class="n">t</span> <span class="ow">in</span> <span class="nb">zip</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">columns</span><span class="p">,</span> <span class="n">df</span><span class="o">.</span><span class="n">dtypes</span><span class="p">):</span>
<span class="k">if</span> <span class="n">t</span> <span class="o">==</span> <span class="nb">object</span><span class="p">:</span>
<span class="n">df</span><span class="p">[</span><span class="n">col</span><span class="p">]</span> <span class="o">=</span> <span class="n">df</span><span class="p">[</span><span class="n">col</span><span class="p">]</span><span class="o">.</span><span class="n">astype</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">StringDtype</span><span class="p">())</span>
<span class="k">return</span> <span class="n">convert</span><span class="o">.</span><span class="n">to_pcollection</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">include_indexes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_include_indexes</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="WriteViaPandas"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.WriteViaPandas">[docs]</a><span class="k">class</span> <span class="nc">WriteViaPandas</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="nb">format</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_writer_func</span> <span class="o">=</span> <span class="nb">globals</span><span class="p">()[</span><span class="s1">&#39;to_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">format</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_args</span> <span class="o">=</span> <span class="n">args</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
<div class="viewcode-block" id="WriteViaPandas.expand"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.WriteViaPandas.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span> <span class="c1"># avoid circular import</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;files_written&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_writer_func</span><span class="p">(</span>
<span class="n">convert</span><span class="o">.</span><span class="n">to_dataframe</span><span class="p">(</span><span class="n">pcoll</span><span class="p">),</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">_args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">)</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">file_result</span><span class="p">:</span> <span class="n">file_result</span><span class="o">.</span><span class="n">file_name</span><span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span>
<span class="nb">str</span><span class="p">)</span>
<span class="p">}</span></div></div>
<span class="k">class</span> <span class="nc">_ReadGbq</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Read data from BigQuery with output type &#39;BEAM_ROW&#39;,</span>
<span class="sd"> then convert it into a deferred dataframe.</span>
<span class="sd"> This PTransform wraps the Python ReadFromBigQuery PTransform,</span>
<span class="sd"> and sets the output_type as &#39;BEAM_ROW&#39; to convert</span>
<span class="sd"> into a Beam Schema. Once applied to a pipeline object,</span>
<span class="sd"> it is passed into the to_dataframe() function to convert the</span>
<span class="sd"> PCollection into a deferred dataframe.</span>
<span class="sd"> This PTransform currently does not support queries.</span>
<span class="sd"> Args:</span>
<span class="sd"> table (str): The ID of the table. The ID must contain only</span>
<span class="sd"> letters ``a-z``, ``A-Z``,</span>
<span class="sd"> numbers ``0-9``, underscores ``_`` or white spaces.</span>
<span class="sd"> Note that the table argument must contain the entire table</span>
<span class="sd"> reference specified as: ``&#39;PROJECT:DATASET.TABLE&#39;``.</span>
<span class="sd"> use_bq_storage_api (bool): The method to use to read from BigQuery.</span>
<span class="sd"> It may be &#39;EXPORT&#39; or</span>
<span class="sd"> &#39;DIRECT_READ&#39;. EXPORT invokes a BigQuery export request</span>
<span class="sd"> (https://cloud.google.com/bigquery/docs/exporting-data).</span>
<span class="sd"> &#39;DIRECT_READ&#39; reads</span>
<span class="sd"> directly from BigQuery storage using the BigQuery Read API</span>
<span class="sd"> (https://cloud.google.com/bigquery/docs/reference/storage). If</span>
<span class="sd"> unspecified or set to false, the default is currently utilized (EXPORT).</span>
<span class="sd"> If the flag is set to true,</span>
<span class="sd"> &#39;DIRECT_READ&#39; will be utilized.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">table</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">dataset_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">use_bqstorage_api</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">table</span> <span class="o">=</span> <span class="n">table</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dataset_id</span> <span class="o">=</span> <span class="n">dataset_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">project_id</span> <span class="o">=</span> <span class="n">project_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">use_bqstorage_api</span> <span class="o">=</span> <span class="n">use_bqstorage_api</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">root</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span> <span class="c1"># avoid circular import</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_bqstorage_api</span><span class="p">:</span>
<span class="n">method</span> <span class="o">=</span> <span class="s1">&#39;DIRECT_READ&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">method</span> <span class="o">=</span> <span class="s1">&#39;EXPORT&#39;</span>
<span class="k">return</span> <span class="n">convert</span><span class="o">.</span><span class="n">to_dataframe</span><span class="p">(</span>
<span class="n">root</span>
<span class="o">|</span> <span class="s1">&#39;_DataFrame_Read_From_BigQuery&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">ReadFromBigQuery</span><span class="p">(</span>
<span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="p">,</span>
<span class="n">dataset</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dataset_id</span><span class="p">,</span>
<span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">project_id</span><span class="p">,</span>
<span class="n">method</span><span class="o">=</span><span class="n">method</span><span class="p">,</span>
<span class="n">output_type</span><span class="o">=</span><span class="s1">&#39;BEAM_ROW&#39;</span><span class="p">))</span>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>