| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.dataframe.io — Apache Beam 2.47.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.47.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.dataframe.io</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.dataframe.io</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| |
| <span class="sd">"""Sources and sinks for the Beam DataFrame API.</span> |
| |
| <span class="sd">Sources</span> |
| <span class="sd">#######</span> |
| <span class="sd">This module provides analogs for pandas ``read`` methods, like</span> |
| <span class="sd">:func:`pandas.read_csv`. However Beam sources like :func:`read_csv`</span> |
| <span class="sd">create a Beam :class:`~apache_beam.PTransform`, and return a</span> |
| <span class="sd">:class:`~apache_beam.dataframe.frames.DeferredDataFrame` or</span> |
| <span class="sd">:class:`~apache_beam.dataframe.frames.DeferredSeries` representing the contents</span> |
| <span class="sd">of the referenced file(s) or data source.</span> |
| |
| <span class="sd">The result of these methods must be applied to a :class:`~apache_beam.Pipeline`</span> |
| <span class="sd">object, for example::</span> |
| |
| <span class="sd"> df = p | beam.dataframe.io.read_csv(...)</span> |
| |
| <span class="sd">Sinks</span> |
| <span class="sd">#####</span> |
| <span class="sd">This module also defines analogs for pandas sink, or ``to``, methods that</span> |
| <span class="sd">generate a Beam :class:`~apache_beam.PTransform`. Users should prefer calling</span> |
| <span class="sd">these operations from :class:`~apache_beam.dataframe.frames.DeferredDataFrame`</span> |
| <span class="sd">instances (for example with</span> |
| <span class="sd">:meth:`DeferredDataFrame.to_csv</span> |
| <span class="sd"><apache_beam.dataframe.frames.DeferredDataFrame.to_csv>`).</span> |
| <span class="sd">"""</span> |
| |
| <span class="kn">import</span> <span class="nn">itertools</span> |
| <span class="kn">import</span> <span class="nn">math</span> |
| <span class="kn">import</span> <span class="nn">re</span> |
| <span class="kn">from</span> <span class="nn">io</span> <span class="kn">import</span> <span class="n">BytesIO</span> |
| <span class="kn">from</span> <span class="nn">io</span> <span class="kn">import</span> <span class="n">StringIO</span> |
| <span class="kn">from</span> <span class="nn">io</span> <span class="kn">import</span> <span class="n">TextIOWrapper</span> |
| |
| <span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span> |
| |
| <span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span> |
| <span class="kn">from</span> <span class="nn">apache_beam</span> <span class="kn">import</span> <span class="n">io</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">frame_base</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="kn">import</span> <span class="n">fileio</span> |
| |
| <span class="n">_DEFAULT_LINES_CHUNKSIZE</span> <span class="o">=</span> <span class="mi">10_000</span> |
| <span class="n">_DEFAULT_BYTES_CHUNKSIZE</span> <span class="o">=</span> <span class="mi">1</span> <span class="o"><<</span> <span class="mi">20</span> |
| |
| |
| <div class="viewcode-block" id="read_gbq"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_gbq">[docs]</a><span class="k">def</span> <span class="nf">read_gbq</span><span class="p">(</span> |
| <span class="n">table</span><span class="p">,</span> <span class="n">dataset</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">use_bqstorage_api</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""This function reads data from a BigQuery table and produces a</span> |
| <span class="sd"> :class:`~apache_beam.dataframe.frames.DeferredDataFrame.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> table (str): Please specify a table. This can be done in the format</span> |
| <span class="sd"> 'PROJECT:dataset.table' if one would not wish to utilize</span> |
| <span class="sd"> the parameters below.</span> |
| <span class="sd"> dataset (str): Please specify the dataset</span> |
| <span class="sd"> (can omit if table was specified as 'PROJECT:dataset.table').</span> |
| <span class="sd"> project_id (str): Please specify the project ID</span> |
| <span class="sd"> (can omit if table was specified as 'PROJECT:dataset.table').</span> |
| <span class="sd"> use_bqstorage_api (bool): If you would like to utilize</span> |
| <span class="sd"> the BigQuery Storage API in ReadFromBigQuery, please set</span> |
| <span class="sd"> this flag to true. Otherwise, please set flag</span> |
| <span class="sd"> to false or leave it unspecified.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">table</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Please specify a BigQuery table to read from."</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="n">kwargs</span><span class="p">)</span> <span class="o">></span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s2">"Encountered unsupported parameter(s) in read_gbq: </span><span class="si">{</span><span class="n">kwargs</span><span class="o">.</span><span class="n">keys</span><span class="p">()</span><span class="si">!r}</span><span class="s2">"</span> |
| <span class="s2">""</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_ReadGbq</span><span class="p">(</span><span class="n">table</span><span class="p">,</span> <span class="n">dataset</span><span class="p">,</span> <span class="n">project_id</span><span class="p">,</span> <span class="n">use_bqstorage_api</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="read_csv"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_csv">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">read_csv</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="n">splittable</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""If your files are large and records do not contain quoted newlines, you may</span> |
| <span class="sd"> pass the extra argument ``splittable=True`` to enable dynamic splitting for</span> |
| <span class="sd"> this read on newlines. Using this option for records that do contain quoted</span> |
| <span class="sd"> newlines may result in partial records and data corruption."""</span> |
| <span class="k">if</span> <span class="s1">'nrows'</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'nrows not yet supported'</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_ReadFromPandas</span><span class="p">(</span> |
| <span class="n">pd</span><span class="o">.</span><span class="n">read_csv</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">args</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">,</span> |
| <span class="n">incremental</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">splitter</span><span class="o">=</span><span class="n">_TextFileSplitter</span><span class="p">(</span><span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">)</span> <span class="k">if</span> <span class="n">splittable</span> <span class="k">else</span> <span class="kc">None</span><span class="p">)</span></div> |
| |
| |
| <span class="k">def</span> <span class="nf">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span> <span class="c1"># avoid circular import</span> |
| <span class="c1"># TODO(roberwb): Amortize the computation for multiple writes?</span> |
| <span class="k">return</span> <span class="n">convert</span><span class="o">.</span><span class="n">to_pcollection</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">yield_elements</span><span class="o">=</span><span class="s1">'pandas'</span><span class="p">,</span> <span class="n">label</span><span class="o">=</span><span class="n">label</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="to_csv"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.to_csv">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">to_csv</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">transform_label</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="n">label_pc</span> <span class="o">=</span> <span class="sa">f</span><span class="s2">"</span><span class="si">{</span><span class="n">transform_label</span><span class="si">}</span><span class="s2"> - ToPCollection"</span> <span class="k">if</span> <span class="n">transform_label</span> \ |
| <span class="k">else</span> <span class="sa">f</span><span class="s2">"ToPCollection(df) - </span><span class="si">{</span><span class="n">path</span><span class="si">}</span><span class="s2">"</span> |
| <span class="n">label_pd</span> <span class="o">=</span> <span class="sa">f</span><span class="s2">"</span><span class="si">{</span><span class="n">transform_label</span><span class="si">}</span><span class="s2"> - ToPandasDataFrame"</span> <span class="k">if</span> <span class="n">transform_label</span> \ |
| <span class="k">else</span> <span class="sa">f</span><span class="s2">"WriteToPandas(df) - </span><span class="si">{</span><span class="n">path</span><span class="si">}</span><span class="s2">"</span> |
| <span class="k">return</span> <span class="n">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">label_pc</span><span class="p">)</span> <span class="o">|</span> <span class="n">label_pd</span> <span class="o">>></span> <span class="n">_WriteToPandas</span><span class="p">(</span> |
| <span class="s1">'to_csv'</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">incremental</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="read_fwf"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_fwf">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">read_fwf</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">_ReadFromPandas</span><span class="p">(</span> |
| <span class="n">pd</span><span class="o">.</span><span class="n">read_fwf</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">args</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">,</span> |
| <span class="n">incremental</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">splitter</span><span class="o">=</span><span class="n">_TextFileSplitter</span><span class="p">(</span><span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">))</span></div> |
| |
| |
| <div class="viewcode-block" id="read_json"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_json">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">read_json</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">if</span> <span class="s1">'nrows'</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">'nrows not yet supported'</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'lines'</span><span class="p">,</span> <span class="kc">False</span><span class="p">):</span> |
| <span class="c1"># Work around https://github.com/pandas-dev/pandas/issues/34548.</span> |
| <span class="n">kwargs</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">kwargs</span><span class="p">,</span> <span class="n">nrows</span><span class="o">=</span><span class="mi">1</span> <span class="o"><<</span> <span class="mi">63</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">_ReadFromPandas</span><span class="p">(</span> |
| <span class="n">pd</span><span class="o">.</span><span class="n">read_json</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">args</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">,</span> |
| <span class="n">incremental</span><span class="o">=</span><span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'lines'</span><span class="p">,</span> <span class="kc">False</span><span class="p">),</span> |
| <span class="n">splitter</span><span class="o">=</span><span class="n">_DelimSplitter</span><span class="p">(</span><span class="sa">b</span><span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="p">,</span> <span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">)</span> <span class="k">if</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="s1">'lines'</span><span class="p">,</span> <span class="kc">False</span><span class="p">)</span> <span class="k">else</span> <span class="kc">None</span><span class="p">,</span> |
| <span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="to_json"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.to_json">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">to_json</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">orient</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">orient</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="o">.</span><span class="n">proxy</span><span class="p">(),</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">):</span> |
| <span class="n">orient</span> <span class="o">=</span> <span class="s1">'columns'</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="o">.</span><span class="n">proxy</span><span class="p">(),</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">):</span> |
| <span class="n">orient</span> <span class="o">=</span> <span class="s1">'index'</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">WontImplementError</span><span class="p">(</span><span class="s1">'not dataframes or series'</span><span class="p">)</span> |
| <span class="n">kwargs</span><span class="p">[</span><span class="s1">'orient'</span><span class="p">]</span> <span class="o">=</span> <span class="n">orient</span> |
| <span class="k">return</span> <span class="n">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">)</span> <span class="o">|</span> <span class="n">_WriteToPandas</span><span class="p">(</span> |
| <span class="s1">'to_json'</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">args</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">,</span> |
| <span class="n">incremental</span><span class="o">=</span><span class="n">orient</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">'index'</span><span class="p">,</span> <span class="s1">'records'</span><span class="p">,</span> <span class="s1">'values'</span><span class="p">),</span> |
| <span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="read_html"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.read_html">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">read_html</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">_ReadFromPandas</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> |
| <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">pd</span><span class="o">.</span><span class="n">read_html</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)[</span><span class="mi">0</span><span class="p">],</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">args</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="to_html"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.to_html">[docs]</a><span class="nd">@frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">to_html</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">)</span> <span class="o">|</span> <span class="n">_WriteToPandas</span><span class="p">(</span> |
| <span class="s1">'to_html'</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">args</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">,</span> |
| <span class="n">incremental</span><span class="o">=</span><span class="p">(</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">_expr</span><span class="o">.</span><span class="n">proxy</span><span class="p">()</span><span class="o">.</span><span class="n">index</span><span class="o">.</span><span class="n">nlevels</span> <span class="o">==</span> <span class="mi">1</span> <span class="ow">or</span> |
| <span class="ow">not</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'sparsify'</span><span class="p">,</span> <span class="kc">True</span><span class="p">)),</span> |
| <span class="n">binary</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div> |
| |
| |
| <span class="k">def</span> <span class="nf">_binary_reader</span><span class="p">(</span><span class="nb">format</span><span class="p">):</span> |
| <span class="n">func</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">pd</span><span class="p">,</span> <span class="s1">'read_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">format</span><span class="p">)</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">path</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">_ReadFromPandas</span><span class="p">(</span><span class="n">func</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">)</span> |
| <span class="n">result</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="sa">f</span><span class="s1">'read_</span><span class="si">{</span><span class="nb">format</span><span class="si">}</span><span class="s1">'</span> |
| |
| <span class="k">return</span> <span class="n">result</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_binary_writer</span><span class="p">(</span><span class="nb">format</span><span class="p">):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">df</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="o">*</span><span class="n">args</span><span class="p">,</span> |
| <span class="o">**</span><span class="n">kwargs</span><span class="p">:</span> <span class="n">_as_pc</span><span class="p">(</span><span class="n">df</span><span class="p">)</span> <span class="o">|</span> <span class="n">_WriteToPandas</span><span class="p">(</span><span class="sa">f</span><span class="s1">'to_</span><span class="si">{</span><span class="nb">format</span><span class="si">}</span><span class="s1">'</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">))</span> |
| <span class="n">result</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="sa">f</span><span class="s1">'to_</span><span class="si">{</span><span class="nb">format</span><span class="si">}</span><span class="s1">'</span> |
| <span class="k">return</span> <span class="n">result</span> |
| |
| |
| <span class="k">for</span> <span class="nb">format</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">'excel'</span><span class="p">,</span> <span class="s1">'feather'</span><span class="p">,</span> <span class="s1">'parquet'</span><span class="p">,</span> <span class="s1">'stata'</span><span class="p">):</span> |
| <span class="nb">globals</span><span class="p">()[</span><span class="s1">'read_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">format</span><span class="p">]</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)(</span> |
| <span class="n">_binary_reader</span><span class="p">(</span><span class="nb">format</span><span class="p">))</span> |
| <span class="nb">globals</span><span class="p">()[</span><span class="s1">'to_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">format</span><span class="p">]</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)(</span> |
| <span class="n">_binary_writer</span><span class="p">(</span><span class="nb">format</span><span class="p">))</span> |
| |
| <span class="k">for</span> <span class="nb">format</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">'sas'</span><span class="p">,</span> <span class="s1">'spss'</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">pd</span><span class="p">,</span> <span class="s1">'read_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">format</span><span class="p">):</span> <span class="c1"># Depends on pandas version.</span> |
| <span class="nb">globals</span><span class="p">()[</span><span class="s1">'read_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">format</span><span class="p">]</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">with_docs_from</span><span class="p">(</span><span class="n">pd</span><span class="p">)(</span> |
| <span class="n">_binary_reader</span><span class="p">(</span><span class="nb">format</span><span class="p">))</span> |
| |
| <span class="n">read_clipboard</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">not_implemented_method</span><span class="p">(</span> |
| <span class="s1">'read_clipboard'</span><span class="p">,</span> <span class="n">base_type</span><span class="o">=</span><span class="n">pd</span><span class="p">)</span> |
| <span class="n">to_clipboard</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">not_implemented_method</span><span class="p">(</span> |
| <span class="s1">'to_clipboard'</span><span class="p">,</span> <span class="n">base_type</span><span class="o">=</span><span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">)</span> |
| <span class="n">read_msgpack</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">wont_implement_method</span><span class="p">(</span> |
| <span class="n">pd</span><span class="p">,</span> <span class="s1">'read_msgpack'</span><span class="p">,</span> <span class="n">reason</span><span class="o">=</span><span class="s2">"deprecated"</span><span class="p">)</span> |
| <span class="n">to_msgpack</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">wont_implement_method</span><span class="p">(</span> |
| <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">,</span> <span class="s1">'to_msgpack'</span><span class="p">,</span> <span class="n">reason</span><span class="o">=</span><span class="s2">"deprecated"</span><span class="p">)</span> |
| <span class="n">read_hdf</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">wont_implement_method</span><span class="p">(</span> |
| <span class="n">pd</span><span class="p">,</span> <span class="s1">'read_hdf'</span><span class="p">,</span> <span class="n">explanation</span><span class="o">=</span><span class="s2">"because HDF5 is a random access file format"</span><span class="p">)</span> |
| <span class="n">to_hdf</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">wont_implement_method</span><span class="p">(</span> |
| <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">,</span> |
| <span class="s1">'to_hdf'</span><span class="p">,</span> |
| <span class="n">explanation</span><span class="o">=</span><span class="s2">"because HDF5 is a random access file format"</span><span class="p">)</span> |
| |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="nb">dir</span><span class="p">(</span><span class="n">pd</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">name</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'read_'</span><span class="p">)</span> <span class="ow">and</span> <span class="n">name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="nb">globals</span><span class="p">():</span> |
| <span class="nb">globals</span><span class="p">()[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">not_implemented_method</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">base_type</span><span class="o">=</span><span class="n">pd</span><span class="p">)</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_shift_range_index</span><span class="p">(</span><span class="n">offset</span><span class="p">,</span> <span class="n">df</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">index</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">RangeIndex</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">df</span><span class="o">.</span><span class="n">set_index</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">index</span> <span class="o">+</span> <span class="n">offset</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">df</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_ReadFromPandas</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">reader</span><span class="p">,</span> |
| <span class="n">path</span><span class="p">,</span> |
| <span class="n">args</span><span class="p">,</span> |
| <span class="n">kwargs</span><span class="p">,</span> |
| <span class="n">binary</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">incremental</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">splitter</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="k">if</span> <span class="s1">'compression'</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">'compression'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="n">frame_base</span><span class="o">.</span><span class="n">WontImplementError</span><span class="p">(</span><span class="s1">'non-deferred'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">reader</span> <span class="o">=</span> <span class="n">reader</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="n">path</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">binary</span> <span class="o">=</span> <span class="n">binary</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span> <span class="o">=</span> <span class="n">incremental</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">splitter</span> <span class="o">=</span> <span class="n">splitter</span> |
| |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">root</span><span class="p">):</span> |
| <span class="n">paths_pcoll</span> <span class="o">=</span> <span class="n">root</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">])</span> |
| <span class="n">match</span> <span class="o">=</span> <span class="n">io</span><span class="o">.</span><span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">match</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">],</span> <span class="n">limits</span><span class="o">=</span><span class="p">[</span><span class="mi">1</span><span class="p">])[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">match</span><span class="o">.</span><span class="n">metadata_list</span><span class="p">:</span> |
| <span class="c1"># TODO(https://github.com/apache/beam/issues/20858): This should be</span> |
| <span class="c1"># allowed for streaming pipelines if user provides an explicit schema.</span> |
| <span class="k">raise</span> <span class="ne">FileNotFoundError</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Found no files that match </span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="si">!r}</span><span class="s2">"</span><span class="p">)</span> |
| <span class="n">first_path</span> <span class="o">=</span> <span class="n">match</span><span class="o">.</span><span class="n">metadata_list</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">path</span> |
| <span class="k">with</span> <span class="n">io</span><span class="o">.</span><span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">first_path</span><span class="p">)</span> <span class="k">as</span> <span class="n">handle</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">:</span> |
| <span class="n">handle</span> <span class="o">=</span> <span class="n">TextIOWrapper</span><span class="p">(</span><span class="n">handle</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">:</span> |
| <span class="n">sample</span> <span class="o">=</span> <span class="nb">next</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">reader</span><span class="p">(</span><span class="n">handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="nb">dict</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">,</span> <span class="n">chunksize</span><span class="o">=</span><span class="mi">100</span><span class="p">)))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">sample</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">reader</span><span class="p">(</span><span class="n">handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> |
| |
| <span class="n">matches_pcoll</span> <span class="o">=</span> <span class="n">paths_pcoll</span> <span class="o">|</span> <span class="n">fileio</span><span class="o">.</span><span class="n">MatchAll</span><span class="p">()</span> |
| <span class="n">indices_pcoll</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">matches_pcoll</span><span class="o">.</span><span class="n">pipeline</span> |
| <span class="o">|</span> <span class="s1">'DoOnce'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="kc">None</span><span class="p">])</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span> |
| <span class="k">lambda</span> <span class="n">_</span><span class="p">,</span> |
| <span class="n">paths</span><span class="p">:</span> <span class="p">{</span><span class="n">path</span><span class="p">:</span> <span class="n">ix</span> |
| <span class="k">for</span> <span class="n">ix</span><span class="p">,</span> <span class="n">path</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="nb">sorted</span><span class="p">(</span><span class="n">paths</span><span class="p">))},</span> |
| <span class="n">paths</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsList</span><span class="p">(</span> |
| <span class="n">matches_pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">match</span><span class="p">:</span> <span class="n">match</span><span class="o">.</span><span class="n">path</span><span class="p">))))</span> |
| |
| <span class="n">pcoll</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">matches_pcoll</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Reshuffle</span><span class="p">()</span> |
| <span class="o">|</span> <span class="n">fileio</span><span class="o">.</span><span class="n">ReadMatches</span><span class="p">()</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_ReadFromPandasDoFn</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">reader</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">splitter</span><span class="p">),</span> |
| <span class="n">path_indices</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span><span class="n">indices_pcoll</span><span class="p">)))</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span> |
| <span class="k">return</span> <span class="n">convert</span><span class="o">.</span><span class="n">to_dataframe</span><span class="p">(</span><span class="n">pcoll</span><span class="p">,</span> <span class="n">proxy</span><span class="o">=</span><span class="n">sample</span><span class="p">[:</span><span class="mi">0</span><span class="p">])</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_Splitter</span><span class="p">:</span> |
| <span class="k">def</span> <span class="nf">empty_buffer</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns an empty buffer of the right type (string or bytes).</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">read_header</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Reads the header from handle, which points to the start of the file.</span> |
| |
| <span class="sd"> Returns the pair (header, buffer) where buffer contains any part of the</span> |
| <span class="sd"> file that was "overread" from handle while seeking the end of header.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">read_to_record_boundary</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffered</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Reads the given handle up to the end of the current record.</span> |
| |
| <span class="sd"> The buffer argument represents bytes that were read previously; logically</span> |
| <span class="sd"> it's as if these were pushed back into handle for reading. If the</span> |
| <span class="sd"> record end is within buffered, it's possible that no more bytes will be read</span> |
| <span class="sd"> from handle at all.</span> |
| |
| <span class="sd"> Returns the pair (remaining_record_bytes, buffer) where buffer contains</span> |
| <span class="sd"> any part of the file that was "overread" from handle while seeking the end</span> |
| <span class="sd"> of the record.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_DelimSplitter</span><span class="p">(</span><span class="n">_Splitter</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A _Splitter that splits on delimiters between records.</span> |
| |
| <span class="sd"> This delimiter is assumed ot never occur within a record.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">delim</span><span class="p">,</span> <span class="n">read_chunk_size</span><span class="o">=</span><span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">):</span> |
| <span class="c1"># Multi-char delimiters would require more care across chunk boundaries.</span> |
| <span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="n">delim</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_delim</span> <span class="o">=</span> <span class="n">delim</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> <span class="o">=</span> <span class="n">delim</span><span class="p">[:</span><span class="mi">0</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_chunk_size</span> <span class="o">=</span> <span class="n">read_chunk_size</span> |
| |
| <span class="k">def</span> <span class="nf">empty_buffer</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> |
| |
| <span class="k">def</span> <span class="nf">read_header</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> |
| |
| <span class="k">def</span> <span class="nf">read_to_record_boundary</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffered</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_delim</span> <span class="ow">in</span> <span class="n">buffered</span><span class="p">:</span> |
| <span class="n">ix</span> <span class="o">=</span> <span class="n">buffered</span><span class="o">.</span><span class="n">index</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_delim</span><span class="p">)</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_delim</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">buffered</span><span class="p">[:</span><span class="n">ix</span><span class="p">],</span> <span class="n">buffered</span><span class="p">[</span><span class="n">ix</span><span class="p">:]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">chunk</span> <span class="o">=</span> <span class="n">handle</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_chunk_size</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_delim</span> <span class="ow">in</span> <span class="n">chunk</span><span class="p">:</span> |
| <span class="n">ix</span> <span class="o">=</span> <span class="n">chunk</span><span class="o">.</span><span class="n">index</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_delim</span><span class="p">)</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_delim</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">buffered</span> <span class="o">+</span> <span class="n">chunk</span><span class="p">[:</span><span class="n">ix</span><span class="p">],</span> <span class="n">chunk</span><span class="p">[</span><span class="n">ix</span><span class="p">:]</span> |
| <span class="k">elif</span> <span class="ow">not</span> <span class="n">chunk</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">buffered</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">buffered</span> <span class="o">+=</span> <span class="n">chunk</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_maybe_encode</span><span class="p">(</span><span class="n">str_or_bytes</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">str_or_bytes</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">str_or_bytes</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">str_or_bytes</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_TextFileSplitter</span><span class="p">(</span><span class="n">_DelimSplitter</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Splitter for dynamically sharding CSV files and newline record boundaries.</span> |
| |
| <span class="sd"> Currently does not handle quoted newlines, so is off by default, but such</span> |
| <span class="sd"> support could be added in the future.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">read_chunk_size</span><span class="o">=</span><span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">args</span><span class="p">:</span> |
| <span class="c1"># TODO(robertwb): Automatically populate kwargs as we do for df methods.</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'Non-path arguments must be passed by keyword '</span> |
| <span class="s1">'for splittable csv reads.'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'skipfooter'</span><span class="p">,</span> <span class="mi">0</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'Splittablility incompatible with skipping footers.'</span><span class="p">)</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="n">_maybe_encode</span><span class="p">(</span><span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'lineterminator'</span><span class="p">,</span> <span class="sa">b</span><span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="p">)),</span> |
| <span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> |
| |
| <span class="k">def</span> <span class="nf">read_header</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">handle</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'header'</span><span class="p">,</span> <span class="s1">'infer'</span><span class="p">)</span> <span class="o">==</span> <span class="s1">'infer'</span><span class="p">:</span> |
| <span class="k">if</span> <span class="s1">'names'</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">:</span> |
| <span class="n">header</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">header</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">header</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">[</span><span class="s1">'header'</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="n">header</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> |
| |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">header</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span> |
| <span class="n">max_header</span> <span class="o">=</span> <span class="n">header</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">max_header</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="n">header</span><span class="p">)</span> |
| |
| <span class="n">skiprows</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'skiprows'</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">skiprows</span><span class="p">,</span> <span class="nb">int</span><span class="p">):</span> |
| <span class="n">is_skiprow</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">ix</span><span class="p">:</span> <span class="n">ix</span> <span class="o"><</span> <span class="n">skiprows</span> |
| <span class="k">elif</span> <span class="nb">callable</span><span class="p">(</span><span class="n">skiprows</span><span class="p">):</span> |
| <span class="n">is_skiprow</span> <span class="o">=</span> <span class="n">skiprows</span> |
| <span class="k">elif</span> <span class="n">skiprows</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">is_skiprow</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">ix</span><span class="p">:</span> <span class="kc">False</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">is_skiprow</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">ix</span><span class="p">:</span> <span class="n">ix</span> <span class="ow">in</span> <span class="n">skiprows</span> |
| |
| <span class="n">comment</span> <span class="o">=</span> <span class="n">_maybe_encode</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'comment'</span><span class="p">,</span> <span class="kc">None</span><span class="p">))</span> |
| <span class="k">if</span> <span class="n">comment</span><span class="p">:</span> |
| <span class="n">is_comment</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="n">line</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="n">comment</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">is_comment</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="kc">False</span> |
| |
| <span class="n">skip_blank_lines</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'skip_blank_lines'</span><span class="p">,</span> <span class="kc">True</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">skip_blank_lines</span><span class="p">:</span> |
| <span class="n">is_blank</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="n">re</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="sa">rb</span><span class="s1">'^\s*$'</span><span class="p">,</span> <span class="n">line</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">is_blank</span> <span class="o">=</span> <span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="kc">False</span> |
| |
| <span class="n">text_header</span> <span class="o">=</span> <span class="sa">b</span><span class="s1">''</span> |
| <span class="n">rest</span> <span class="o">=</span> <span class="sa">b</span><span class="s1">''</span> |
| <span class="n">skipped</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="k">for</span> <span class="n">ix</span> <span class="ow">in</span> <span class="n">itertools</span><span class="o">.</span><span class="n">count</span><span class="p">():</span> |
| <span class="n">line</span><span class="p">,</span> <span class="n">rest</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">read_to_record_boundary</span><span class="p">(</span><span class="n">rest</span><span class="p">,</span> <span class="n">handle</span><span class="p">)</span> |
| <span class="n">text_header</span> <span class="o">+=</span> <span class="n">line</span> |
| <span class="k">if</span> <span class="n">is_skiprow</span><span class="p">(</span><span class="n">ix</span><span class="p">)</span> <span class="ow">or</span> <span class="n">is_blank</span><span class="p">(</span><span class="n">line</span><span class="p">)</span> <span class="ow">or</span> <span class="n">is_comment</span><span class="p">(</span><span class="n">line</span><span class="p">):</span> |
| <span class="n">skipped</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">continue</span> |
| <span class="k">if</span> <span class="n">ix</span> <span class="o">-</span> <span class="n">skipped</span> <span class="o">==</span> <span class="n">max_header</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">text_header</span><span class="p">,</span> <span class="n">rest</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_TruncatingFileHandle</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A wrapper of a file-like object representing the restriction of the</span> |
| <span class="sd"> underling handle according to the given SDF restriction tracker, breaking</span> |
| <span class="sd"> the file only after the given delimiter.</span> |
| |
| <span class="sd"> For example, if the underling restriction is [103, 607) and each line were</span> |
| <span class="sd"> exactly 10 characters long (i.e. every 10th charcter was a newline), then this</span> |
| <span class="sd"> would give a view of a 500-byte file consisting of bytes bytes 110 to 609</span> |
| <span class="sd"> (inclusive) of the underlying file.</span> |
| |
| <span class="sd"> As with all SDF trackers, the endpoint may change dynamically during reading.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">underlying</span><span class="p">,</span> <span class="n">tracker</span><span class="p">,</span> <span class="n">splitter</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span> <span class="o">=</span> <span class="n">underlying</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_tracker</span> <span class="o">=</span> <span class="n">tracker</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span> <span class="o">=</span> <span class="n">splitter</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">empty_buffer</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_done</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">read_header</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="n">start</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span><span class="o">.</span><span class="n">start</span> |
| <span class="c1"># Seek to first delimiter after the start position.</span> |
| <span class="k">if</span> <span class="n">start</span> <span class="o">></span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">start</span> <span class="o">></span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">)</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">=</span> <span class="n">start</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="o">.</span><span class="n">seek</span><span class="p">(</span><span class="n">start</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">=</span> <span class="n">start</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">[</span><span class="n">start</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">):]</span> |
| <span class="n">skip</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">read_to_record_boundary</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">+=</span> <span class="nb">len</span><span class="p">(</span><span class="n">skip</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">readable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="kc">True</span> |
| |
| <span class="k">def</span> <span class="nf">writable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="kc">False</span> |
| |
| <span class="k">def</span> <span class="nf">seekable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="kc">False</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">closed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="kc">False</span> |
| |
| <span class="k">def</span> <span class="fm">__iter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># For pandas is_file_like.</span> |
| <span class="k">return</span> <span class="bp">self</span> |
| |
| <span class="k">def</span> <span class="fm">__next__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_line_iterator</span><span class="p">()</span> |
| <span class="k">return</span> <span class="nb">next</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">readline</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># This attribute is checked, but unused, by pandas.</span> |
| <span class="k">return</span> <span class="nb">next</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_line_iterator</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">line_start</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="n">chunk</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read</span><span class="p">()</span> |
| <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="n">line_end</span> <span class="o">=</span> <span class="n">chunk</span><span class="o">.</span><span class="n">find</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">_delim</span><span class="p">,</span> <span class="n">line_start</span><span class="p">)</span> |
| <span class="k">while</span> <span class="n">line_end</span> <span class="o">==</span> <span class="o">-</span><span class="mi">1</span><span class="p">:</span> |
| <span class="n">more</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">more</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">line_start</span> <span class="o"><</span> <span class="nb">len</span><span class="p">(</span><span class="n">chunk</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="n">chunk</span><span class="p">[</span><span class="n">line_start</span><span class="p">:]</span> |
| <span class="k">return</span> |
| <span class="n">chunk</span> <span class="o">=</span> <span class="n">chunk</span><span class="p">[</span><span class="n">line_start</span><span class="p">:]</span> <span class="o">+</span> <span class="n">more</span> |
| <span class="n">line_start</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="n">line_end</span> <span class="o">=</span> <span class="n">chunk</span><span class="o">.</span><span class="n">find</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">_delim</span><span class="p">,</span> <span class="n">line_start</span><span class="p">)</span> |
| <span class="k">yield</span> <span class="n">chunk</span><span class="p">[</span><span class="n">line_start</span><span class="p">:</span><span class="n">line_end</span> <span class="o">+</span> <span class="mi">1</span><span class="p">]</span> |
| <span class="n">line_start</span> <span class="o">=</span> <span class="n">line_end</span> <span class="o">+</span> <span class="mi">1</span> |
| |
| <span class="k">def</span> <span class="nf">read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">size</span><span class="o">=-</span><span class="mi">1</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_iterator</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">'Cannot call read after iterating.'</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read</span><span class="p">(</span><span class="n">size</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">size</span><span class="o">=-</span><span class="mi">1</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_header</span><span class="p">:</span> |
| <span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_header</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_header</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">return</span> <span class="n">res</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_done</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> |
| <span class="k">elif</span> <span class="n">size</span> <span class="o">==</span> <span class="o">-</span><span class="mi">1</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">+=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> |
| <span class="k">elif</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="n">size</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_done</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tracker</span><span class="o">.</span><span class="n">try_claim</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">)):</span> |
| <span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> <span class="o">+=</span> <span class="nb">len</span><span class="p">(</span><span class="n">res</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">offset</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span><span class="o">.</span><span class="n">stop</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer_start_pos</span> |
| <span class="k">if</span> <span class="n">offset</span> <span class="o"><=</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_empty</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">rest</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splitter</span><span class="o">.</span><span class="n">read_to_record_boundary</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">[</span><span class="n">offset</span><span class="p">:],</span> <span class="bp">self</span><span class="o">.</span><span class="n">_underlying</span><span class="p">)</span> |
| <span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_buffer</span><span class="p">[:</span><span class="n">offset</span><span class="p">]</span> <span class="o">+</span> <span class="n">rest</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_done</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="k">return</span> <span class="n">res</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_ReadFromPandasDoFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">,</span> <span class="n">beam</span><span class="o">.</span><span class="n">RestrictionProvider</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">reader</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">binary</span><span class="p">,</span> <span class="n">incremental</span><span class="p">,</span> <span class="n">splitter</span><span class="p">):</span> |
| <span class="c1"># avoid pickling issues</span> |
| <span class="k">if</span> <span class="n">reader</span><span class="o">.</span><span class="vm">__module__</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'pandas.'</span><span class="p">):</span> |
| <span class="n">reader</span> <span class="o">=</span> <span class="n">reader</span><span class="o">.</span><span class="vm">__name__</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">reader</span> <span class="o">=</span> <span class="n">reader</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">binary</span> <span class="o">=</span> <span class="n">binary</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span> <span class="o">=</span> <span class="n">incremental</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">splitter</span> <span class="o">=</span> <span class="n">splitter</span> |
| |
| <span class="k">def</span> <span class="nf">initial_restriction</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">readable_file</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">restriction_trackers</span><span class="o">.</span><span class="n">OffsetRange</span><span class="p">(</span> |
| <span class="mi">0</span><span class="p">,</span> <span class="n">readable_file</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">size_in_bytes</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">restriction_size</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">readable_file</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">restriction</span><span class="o">.</span><span class="n">size</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">create_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">restriction</span><span class="p">):</span> |
| <span class="n">tracker</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">restriction_trackers</span><span class="o">.</span><span class="n">OffsetRestrictionTracker</span><span class="p">(</span><span class="n">restriction</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">splitter</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">tracker</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">restriction_trackers</span><span class="o">.</span><span class="n">UnsplittableRestrictionTracker</span><span class="p">(</span> |
| <span class="n">tracker</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">readable_file</span><span class="p">,</span> <span class="n">path_indices</span><span class="p">,</span> <span class="n">tracker</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">RestrictionParam</span><span class="p">()):</span> |
| <span class="n">reader</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">reader</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">reader</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span> |
| <span class="n">reader</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">pd</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">reader</span><span class="p">)</span> |
| <span class="n">indices_per_file</span> <span class="o">=</span> <span class="mi">10</span><span class="o">**</span><span class="nb">int</span><span class="p">(</span><span class="n">math</span><span class="o">.</span><span class="n">log</span><span class="p">(</span><span class="mi">2</span><span class="o">**</span><span class="mi">63</span> <span class="o">//</span> <span class="nb">len</span><span class="p">(</span><span class="n">path_indices</span><span class="p">),</span> <span class="mi">10</span><span class="p">))</span> |
| <span class="k">if</span> <span class="n">readable_file</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">size_in_bytes</span> <span class="o">></span> <span class="n">indices_per_file</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span> |
| <span class="sa">f</span><span class="s1">'Cannot safely index records from </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">path_indices</span><span class="p">)</span><span class="si">}</span><span class="s1"> files '</span> |
| <span class="sa">f</span><span class="s1">'of size </span><span class="si">{</span><span class="n">readable_file</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">size_in_bytes</span><span class="si">}</span><span class="s1"> '</span> |
| <span class="sa">f</span><span class="s1">'as their product is greater than 2^63.'</span><span class="p">)</span> |
| <span class="n">start_index</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span><span class="o">.</span><span class="n">start</span> <span class="o">+</span> |
| <span class="n">path_indices</span><span class="p">[</span><span class="n">readable_file</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="p">]</span> <span class="o">*</span> <span class="n">indices_per_file</span><span class="p">)</span> |
| <span class="k">with</span> <span class="n">readable_file</span><span class="o">.</span><span class="n">open</span><span class="p">()</span> <span class="k">as</span> <span class="n">handle</span><span class="p">:</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">:</span> |
| <span class="c1"># TODO(robertwb): We could consider trying to get progress for</span> |
| <span class="c1"># non-incremental sources that are read linearly, as long as they</span> |
| <span class="c1"># don't try to seek. This could be deceptive as progress would</span> |
| <span class="c1"># advance to 100% the instant the (large) read was done, discounting</span> |
| <span class="c1"># any downstream processing.</span> |
| <span class="n">handle</span> <span class="o">=</span> <span class="n">_TruncatingFileHandle</span><span class="p">(</span> |
| <span class="n">handle</span><span class="p">,</span> |
| <span class="n">tracker</span><span class="p">,</span> |
| <span class="n">splitter</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">splitter</span> <span class="ow">or</span> |
| <span class="n">_DelimSplitter</span><span class="p">(</span><span class="sa">b</span><span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="p">,</span> <span class="n">_DEFAULT_BYTES_CHUNKSIZE</span><span class="p">))</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">:</span> |
| <span class="n">handle</span> <span class="o">=</span> <span class="n">TextIOWrapper</span><span class="p">(</span><span class="n">handle</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">:</span> |
| <span class="k">if</span> <span class="s1">'chunksize'</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="s1">'chunksize'</span><span class="p">]</span> <span class="o">=</span> <span class="n">_DEFAULT_LINES_CHUNKSIZE</span> |
| <span class="n">frames</span> <span class="o">=</span> <span class="n">reader</span><span class="p">(</span><span class="n">handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">frames</span> <span class="o">=</span> <span class="p">[</span><span class="n">reader</span><span class="p">(</span><span class="n">handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)]</span> |
| <span class="k">for</span> <span class="n">df</span> <span class="ow">in</span> <span class="n">frames</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">_shift_range_index</span><span class="p">(</span><span class="n">start_index</span><span class="p">,</span> <span class="n">df</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">:</span> |
| <span class="c1"># Satisfy the SDF contract by claiming the whole range.</span> |
| <span class="c1"># Do this after emitting the frames to avoid advancing progress to 100%</span> |
| <span class="c1"># prior to that.</span> |
| <span class="n">tracker</span><span class="o">.</span><span class="n">try_claim</span><span class="p">(</span><span class="n">tracker</span><span class="o">.</span><span class="n">current_restriction</span><span class="p">()</span><span class="o">.</span><span class="n">stop</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_WriteToPandas</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">writer</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">incremental</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">binary</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="n">writer</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="n">path</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span> <span class="o">=</span> <span class="n">incremental</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">binary</span> <span class="o">=</span> <span class="n">binary</span> |
| |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="k">if</span> <span class="s1">'file_naming'</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">:</span> |
| <span class="nb">dir</span><span class="p">,</span> <span class="n">name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="s1">''</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="nb">dir</span><span class="p">,</span> <span class="n">name</span> <span class="o">=</span> <span class="n">io</span><span class="o">.</span><span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">fileio</span><span class="o">.</span><span class="n">WriteToFiles</span><span class="p">(</span> |
| <span class="n">path</span><span class="o">=</span><span class="nb">dir</span><span class="p">,</span> |
| <span class="n">shards</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="s1">'num_shards'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">file_naming</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span> |
| <span class="s1">'file_naming'</span><span class="p">,</span> <span class="n">fileio</span><span class="o">.</span><span class="n">default_file_naming</span><span class="p">(</span><span class="n">name</span><span class="p">)),</span> |
| <span class="n">sink</span><span class="o">=</span><span class="k">lambda</span> <span class="n">_</span><span class="p">:</span> <span class="n">_WriteToPandasFileSink</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">))</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_WriteToPandasFileSink</span><span class="p">(</span><span class="n">fileio</span><span class="o">.</span><span class="n">FileSink</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">writer</span><span class="p">,</span> <span class="n">args</span><span class="p">,</span> <span class="n">kwargs</span><span class="p">,</span> <span class="n">incremental</span><span class="p">,</span> <span class="n">binary</span><span class="p">):</span> |
| <span class="k">if</span> <span class="s1">'compression'</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">(</span><span class="s1">'compression'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">writer</span> <span class="o">=</span> <span class="n">writer</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">incremental</span> <span class="o">=</span> <span class="n">incremental</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">binary</span> <span class="o">=</span> <span class="n">binary</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">StringOrBytesIO</span> <span class="o">=</span> <span class="n">BytesIO</span> <span class="k">if</span> <span class="n">binary</span> <span class="k">else</span> <span class="n">StringIO</span> |
| <span class="k">if</span> <span class="n">incremental</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_record_incremental</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">flush</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">close_incremental</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer_record</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">flush</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">flush_buffer</span> |
| |
| <span class="k">def</span> <span class="nf">open</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_handle</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">empty</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">header</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">footer</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">binary</span><span class="p">:</span> |
| <span class="n">file_handle</span> <span class="o">=</span> <span class="n">TextIOWrapper</span><span class="p">(</span><span class="n">file_handle</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span> <span class="o">=</span> <span class="n">file_handle</span> |
| |
| <span class="k">def</span> <span class="nf">write_to</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">df</span><span class="p">,</span> <span class="n">file_handle</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="n">non_none_handle</span> <span class="o">=</span> <span class="n">file_handle</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">StringOrBytesIO</span><span class="p">()</span> |
| <span class="nb">getattr</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">writer</span><span class="p">)(</span><span class="n">non_none_handle</span><span class="p">,</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">file_handle</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">non_none_handle</span><span class="o">.</span><span class="n">getvalue</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">write_record_incremental</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">empty</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">empty</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">value</span><span class="p">[:</span><span class="mi">0</span><span class="p">])</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">header</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">value</span><span class="p">):</span> |
| |
| <span class="k">def</span> <span class="nf">new_value</span><span class="p">(</span><span class="n">ix</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">ix</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">(</span><span class="n">new_value</span><span class="p">(</span><span class="n">ix</span><span class="p">[</span><span class="mi">0</span><span class="p">]),</span> <span class="p">)</span> <span class="o">+</span> <span class="n">ix</span><span class="p">[</span><span class="mi">1</span><span class="p">:]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="s1">'x'</span><span class="p">)</span> <span class="o">+</span> <span class="s1">'_again'</span> |
| |
| <span class="k">def</span> <span class="nf">change_index</span><span class="p">(</span><span class="n">df</span><span class="p">):</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">index</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">index</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">new_value</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">df</span> |
| |
| <span class="n">one_row</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">value</span><span class="p">[:</span><span class="mi">1</span><span class="p">])</span> |
| <span class="n">another_row</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">change_index</span><span class="p">(</span><span class="n">value</span><span class="p">[:</span><span class="mi">1</span><span class="p">]))</span> |
| <span class="n">two_rows</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">concat</span><span class="p">([</span><span class="n">value</span><span class="p">[:</span><span class="mi">1</span><span class="p">],</span> <span class="n">change_index</span><span class="p">(</span><span class="n">value</span><span class="p">[:</span><span class="mi">1</span><span class="p">])]))</span> |
| <span class="k">for</span> <span class="n">ix</span><span class="p">,</span> <span class="n">c</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">one_row</span><span class="p">[</span><span class="n">ix</span><span class="p">]</span> <span class="o">!=</span> <span class="n">c</span><span class="p">:</span> |
| <span class="k">break</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">ix</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">header</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">[:</span><span class="n">ix</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">footer</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">[</span><span class="n">ix</span><span class="p">:]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">delimiter</span> <span class="o">=</span> <span class="n">two_rows</span><span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="n">one_row</span><span class="p">)</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">footer</span><span class="p">):</span><span class="o">-</span><span class="p">(</span> |
| <span class="nb">len</span><span class="p">(</span><span class="n">another_row</span><span class="p">)</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">header</span><span class="p">))</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">header</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">first</span> <span class="o">=</span> <span class="kc">True</span> |
| |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">value</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">first</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">first</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">delimiter</span><span class="p">)</span> |
| |
| <span class="c1"># IDEA(robertwb): Construct a "truncating" stream wrapper to avoid the</span> |
| <span class="c1"># in-memory copy.</span> |
| <span class="n">rows</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">rows</span><span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">header</span><span class="p">):</span><span class="o">-</span><span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">footer</span><span class="p">)</span> <span class="ow">or</span> <span class="kc">None</span><span class="p">])</span> |
| |
| <span class="k">def</span> <span class="nf">close_incremental</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">footer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">footer</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">empty</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">empty</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">buffer_record</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">flush_buffer</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">write_to</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">concat</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">file_handle</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span> |
| |
| |
| <div class="viewcode-block" id="ReadViaPandas"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.ReadViaPandas">[docs]</a><span class="k">class</span> <span class="nc">ReadViaPandas</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="nb">format</span><span class="p">,</span> |
| <span class="o">*</span><span class="n">args</span><span class="p">,</span> |
| <span class="n">include_indexes</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">objects_as_strings</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_reader</span> <span class="o">=</span> <span class="nb">globals</span><span class="p">()[</span><span class="s1">'read_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">format</span><span class="p">](</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_include_indexes</span> <span class="o">=</span> <span class="n">include_indexes</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_objects_as_strings</span> <span class="o">=</span> <span class="n">objects_as_strings</span> |
| |
| <div class="viewcode-block" id="ReadViaPandas.expand"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.ReadViaPandas.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">p</span><span class="p">):</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span> <span class="c1"># avoid circular import</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">_reader</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_objects_as_strings</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">col</span><span class="p">,</span> <span class="n">t</span> <span class="ow">in</span> <span class="nb">zip</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">columns</span><span class="p">,</span> <span class="n">df</span><span class="o">.</span><span class="n">dtypes</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">t</span> <span class="o">==</span> <span class="nb">object</span><span class="p">:</span> |
| <span class="n">df</span><span class="p">[</span><span class="n">col</span><span class="p">]</span> <span class="o">=</span> <span class="n">df</span><span class="p">[</span><span class="n">col</span><span class="p">]</span><span class="o">.</span><span class="n">astype</span><span class="p">(</span><span class="n">pd</span><span class="o">.</span><span class="n">StringDtype</span><span class="p">())</span> |
| <span class="k">return</span> <span class="n">convert</span><span class="o">.</span><span class="n">to_pcollection</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">include_indexes</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_include_indexes</span><span class="p">)</span></div></div> |
| |
| |
| <div class="viewcode-block" id="WriteViaPandas"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.WriteViaPandas">[docs]</a><span class="k">class</span> <span class="nc">WriteViaPandas</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="nb">format</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_writer_func</span> <span class="o">=</span> <span class="nb">globals</span><span class="p">()[</span><span class="s1">'to_</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">format</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_args</span> <span class="o">=</span> <span class="n">args</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> |
| |
| <div class="viewcode-block" id="WriteViaPandas.expand"><a class="viewcode-back" href="../../../apache_beam.dataframe.io.html#apache_beam.dataframe.frames.WriteViaPandas.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span> <span class="c1"># avoid circular import</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'files_written'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_writer_func</span><span class="p">(</span> |
| <span class="n">convert</span><span class="o">.</span><span class="n">to_dataframe</span><span class="p">(</span><span class="n">pcoll</span><span class="p">),</span> <span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">_args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">_kwargs</span><span class="p">)</span> |
| <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">file_result</span><span class="p">:</span> <span class="n">file_result</span><span class="o">.</span><span class="n">file_name</span><span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span> |
| <span class="nb">str</span><span class="p">)</span> |
| <span class="p">}</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">_ReadGbq</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Read data from BigQuery with output type 'BEAM_ROW',</span> |
| <span class="sd"> then convert it into a deferred dataframe.</span> |
| |
| <span class="sd"> This PTransform wraps the Python ReadFromBigQuery PTransform,</span> |
| <span class="sd"> and sets the output_type as 'BEAM_ROW' to convert</span> |
| <span class="sd"> into a Beam Schema. Once applied to a pipeline object,</span> |
| <span class="sd"> it is passed into the to_dataframe() function to convert the</span> |
| <span class="sd"> PCollection into a deferred dataframe.</span> |
| |
| <span class="sd"> This PTransform currently does not support queries.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> table (str): The ID of the table. The ID must contain only</span> |
| <span class="sd"> letters ``a-z``, ``A-Z``,</span> |
| <span class="sd"> numbers ``0-9``, underscores ``_`` or white spaces.</span> |
| <span class="sd"> Note that the table argument must contain the entire table</span> |
| <span class="sd"> reference specified as: ``'PROJECT:DATASET.TABLE'``.</span> |
| <span class="sd"> use_bq_storage_api (bool): The method to use to read from BigQuery.</span> |
| <span class="sd"> It may be 'EXPORT' or</span> |
| <span class="sd"> 'DIRECT_READ'. EXPORT invokes a BigQuery export request</span> |
| <span class="sd"> (https://cloud.google.com/bigquery/docs/exporting-data).</span> |
| <span class="sd"> 'DIRECT_READ' reads</span> |
| <span class="sd"> directly from BigQuery storage using the BigQuery Read API</span> |
| <span class="sd"> (https://cloud.google.com/bigquery/docs/reference/storage). If</span> |
| <span class="sd"> unspecified or set to false, the default is currently utilized (EXPORT).</span> |
| <span class="sd"> If the flag is set to true,</span> |
| <span class="sd"> 'DIRECT_READ' will be utilized."""</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">table</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">dataset_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">project_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">use_bqstorage_api</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">table</span> <span class="o">=</span> <span class="n">table</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dataset_id</span> <span class="o">=</span> <span class="n">dataset_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">project_id</span> <span class="o">=</span> <span class="n">project_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">use_bqstorage_api</span> <span class="o">=</span> <span class="n">use_bqstorage_api</span> |
| |
| <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">root</span><span class="p">):</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.dataframe</span> <span class="kn">import</span> <span class="n">convert</span> <span class="c1"># avoid circular import</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_bqstorage_api</span><span class="p">:</span> |
| <span class="n">method</span> <span class="o">=</span> <span class="s1">'DIRECT_READ'</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">method</span> <span class="o">=</span> <span class="s1">'EXPORT'</span> |
| <span class="k">return</span> <span class="n">convert</span><span class="o">.</span><span class="n">to_dataframe</span><span class="p">(</span> |
| <span class="n">root</span> |
| <span class="o">|</span> <span class="s1">'_DataFrame_Read_From_BigQuery'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">ReadFromBigQuery</span><span class="p">(</span> |
| <span class="n">table</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="p">,</span> |
| <span class="n">dataset</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dataset_id</span><span class="p">,</span> |
| <span class="n">project</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">project_id</span><span class="p">,</span> |
| <span class="n">method</span><span class="o">=</span><span class="n">method</span><span class="p">,</span> |
| <span class="n">output_type</span><span class="o">=</span><span class="s1">'BEAM_ROW'</span><span class="p">))</span> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |