blob: 0c1b00f1c7d0a925c0f899c8b38b24dc8bbd3818 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.io.fileio &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.io.fileio</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.fileio</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;``PTransforms`` for manipulating files in Apache Beam.</span>
<span class="sd">Provides reading ``PTransform``\\s, ``MatchFiles``,</span>
<span class="sd">``MatchAll``, that produces a ``PCollection`` of records representing a file</span>
<span class="sd">and its metadata; and ``ReadMatches``, which takes in a ``PCollection`` of file</span>
<span class="sd">metadata records, and produces a ``PCollection`` of ``ReadableFile`` objects.</span>
<span class="sd">These transforms currently do not support splitting by themselves.</span>
<span class="sd">Writing to Files</span>
<span class="sd">================</span>
<span class="sd">The transforms in this file include ``WriteToFiles``, which allows you to write</span>
<span class="sd">a ``beam.PCollection`` to files, and gives you many options to customize how to</span>
<span class="sd">do this.</span>
<span class="sd">The ``WriteToFiles`` transform supports bounded and unbounded PCollections</span>
<span class="sd">(i.e. it can be used both batch and streaming pipelines). For streaming</span>
<span class="sd">pipelines, it currently does not have support for multiple trigger firings</span>
<span class="sd">on the same window.</span>
<span class="sd">File Naming</span>
<span class="sd">-----------</span>
<span class="sd">One of the parameters received by ``WriteToFiles`` is a function specifying how</span>
<span class="sd">to name the files that are written. This is a function that takes in the</span>
<span class="sd">following parameters:</span>
<span class="sd">- window</span>
<span class="sd">- pane</span>
<span class="sd">- shard_index</span>
<span class="sd">- total_shards</span>
<span class="sd">- compression</span>
<span class="sd">- destination</span>
<span class="sd">It should return a file name that is unique for a combination of these</span>
<span class="sd">parameters.</span>
<span class="sd">The default naming strategy is to name files</span>
<span class="sd">in the format</span>
<span class="sd">`$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix`,</span>
<span class="sd">where:</span>
<span class="sd">- `$prefix` is, by default, `&quot;output&quot;`.</span>
<span class="sd">- `$start` and `$end` are the boundaries of the window for the data being</span>
<span class="sd"> written. These are omitted if we&#39;re using the Global window.</span>
<span class="sd">- `$pane` is the index for the number of firing for a window.</span>
<span class="sd">- `$shard` and `$numShards` are the current shard number, and the total number</span>
<span class="sd"> of shards for this window firing.</span>
<span class="sd">- `$suffix` is, by default, an empty string, but it can be set by the user via</span>
<span class="sd"> ``default_file_naming``.</span>
<span class="sd">Dynamic Destinations</span>
<span class="sd">--------------------</span>
<span class="sd">If the elements in the input ``beam.PCollection`` can be partitioned into groups</span>
<span class="sd">that should be treated differently (e.g. some events are to be stored as CSV,</span>
<span class="sd">while some others are to be stored as Avro files), it is possible to do this</span>
<span class="sd">by passing a `destination` parameter to ``WriteToFiles``. Something like the</span>
<span class="sd">following::</span>
<span class="sd"> my_pcollection | beam.io.fileio.WriteToFiles(</span>
<span class="sd"> path=&#39;/my/file/path&#39;,</span>
<span class="sd"> destination=lambda record: &#39;avro&#39; if record[&#39;type&#39;] == &#39;A&#39; else &#39;csv&#39;,</span>
<span class="sd"> sink=lambda dest: AvroSink() if dest == &#39;avro&#39; else CsvSink(),</span>
<span class="sd"> file_naming=beam.io.fileio.destination_prefix_naming())</span>
<span class="sd">In this transform, depending on the type of a record, it will be written down to</span>
<span class="sd">a destination named `&#39;avro&#39;`, or `&#39;csv&#39;`. The value returned by the</span>
<span class="sd">`destination` call is then passed to the `sink` call, to determine what sort of</span>
<span class="sd">sink will be used for each destination. The return type of the `destination`</span>
<span class="sd">parameter can be anything, as long as elements can be grouped by it.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">import</span> <span class="nn">collections</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">random</span>
<span class="kn">import</span> <span class="nn">uuid</span>
<span class="kn">from</span> <span class="nn">collections</span> <span class="kn">import</span> <span class="n">namedtuple</span>
<span class="kn">from</span> <span class="nn">functools</span> <span class="kn">import</span> <span class="n">partial</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">TYPE_CHECKING</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">BinaryIO</span> <span class="c1"># pylint: disable=unused-import</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Callable</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">DefaultDict</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Iterable</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">List</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Tuple</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Union</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="kn">import</span> <span class="n">filesystem</span>
<span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="kn">import</span> <span class="n">filesystems</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.filesystem</span> <span class="kn">import</span> <span class="n">BeamIOError</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.filesystem</span> <span class="kn">import</span> <span class="n">CompressionTypes</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="kn">import</span> <span class="n">GoogleCloudOptions</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="kn">import</span> <span class="n">StaticValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="kn">import</span> <span class="n">ValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.periodicsequence</span> <span class="kn">import</span> <span class="n">PeriodicImpulse</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.userstate</span> <span class="kn">import</span> <span class="n">CombiningValueStateSpec</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">FixedWindows</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">GlobalWindow</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">IntervalWindow</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.annotations</span> <span class="kn">import</span> <span class="n">experimental</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.timestamp</span> <span class="kn">import</span> <span class="n">MAX_TIMESTAMP</span>
<span class="kn">from</span> <span class="nn">apache_beam.utils.timestamp</span> <span class="kn">import</span> <span class="n">Timestamp</span>
<span class="k">if</span> <span class="n">TYPE_CHECKING</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.window</span> <span class="kn">import</span> <span class="n">BoundedWindow</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;EmptyMatchTreatment&#39;</span><span class="p">,</span>
<span class="s1">&#39;MatchFiles&#39;</span><span class="p">,</span>
<span class="s1">&#39;MatchAll&#39;</span><span class="p">,</span>
<span class="s1">&#39;MatchContinuously&#39;</span><span class="p">,</span>
<span class="s1">&#39;ReadableFile&#39;</span><span class="p">,</span>
<span class="s1">&#39;ReadMatches&#39;</span><span class="p">,</span>
<span class="s1">&#39;WriteToFiles&#39;</span>
<span class="p">]</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<span class="n">FileMetadata</span> <span class="o">=</span> <span class="n">namedtuple</span><span class="p">(</span><span class="s2">&quot;FileMetadata&quot;</span><span class="p">,</span> <span class="s2">&quot;mime_type compression_type&quot;</span><span class="p">)</span>
<span class="n">CreateFileMetadataFn</span> <span class="o">=</span> <span class="n">Callable</span><span class="p">[[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">],</span> <span class="n">FileMetadata</span><span class="p">]</span>
<div class="viewcode-block" id="EmptyMatchTreatment"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.EmptyMatchTreatment">[docs]</a><span class="k">class</span> <span class="nc">EmptyMatchTreatment</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;How to treat empty matches in ``MatchAll`` and ``MatchFiles`` transforms.</span>
<span class="sd"> If empty matches are disallowed, an error will be thrown if a pattern does not</span>
<span class="sd"> match any files.&quot;&quot;&quot;</span>
<span class="n">ALLOW</span> <span class="o">=</span> <span class="s1">&#39;ALLOW&#39;</span>
<span class="n">DISALLOW</span> <span class="o">=</span> <span class="s1">&#39;DISALLOW&#39;</span>
<span class="n">ALLOW_IF_WILDCARD</span> <span class="o">=</span> <span class="s1">&#39;ALLOW_IF_WILDCARD&#39;</span>
<div class="viewcode-block" id="EmptyMatchTreatment.allow_empty_match"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.EmptyMatchTreatment.allow_empty_match">[docs]</a> <span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">allow_empty_match</span><span class="p">(</span><span class="n">pattern</span><span class="p">,</span> <span class="n">setting</span><span class="p">):</span>
<span class="k">if</span> <span class="n">setting</span> <span class="o">==</span> <span class="n">EmptyMatchTreatment</span><span class="o">.</span><span class="n">ALLOW</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="n">setting</span> <span class="o">==</span> <span class="n">EmptyMatchTreatment</span><span class="o">.</span><span class="n">ALLOW_IF_WILDCARD</span> <span class="ow">and</span> <span class="s1">&#39;*&#39;</span> <span class="ow">in</span> <span class="n">pattern</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">elif</span> <span class="n">setting</span> <span class="o">==</span> <span class="n">EmptyMatchTreatment</span><span class="o">.</span><span class="n">DISALLOW</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="n">setting</span><span class="p">)</span></div></div>
<span class="k">class</span> <span class="nc">_MatchAllFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">empty_match_treatment</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_empty_match_treatment</span> <span class="o">=</span> <span class="n">empty_match_treatment</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_pattern</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">List</span><span class="p">[</span><span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">]:</span>
<span class="c1"># TODO: Should we batch the lookups?</span>
<span class="n">match_results</span> <span class="o">=</span> <span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">match</span><span class="p">([</span><span class="n">file_pattern</span><span class="p">])</span>
<span class="n">match_result</span> <span class="o">=</span> <span class="n">match_results</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="n">match_result</span><span class="o">.</span><span class="n">metadata_list</span> <span class="ow">and</span>
<span class="ow">not</span> <span class="n">EmptyMatchTreatment</span><span class="o">.</span><span class="n">allow_empty_match</span><span class="p">(</span><span class="n">file_pattern</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_empty_match_treatment</span><span class="p">)):</span>
<span class="k">raise</span> <span class="n">BeamIOError</span><span class="p">(</span>
<span class="s1">&#39;Empty match for pattern </span><span class="si">%s</span><span class="s1">. Disallowed.&#39;</span> <span class="o">%</span> <span class="n">file_pattern</span><span class="p">)</span>
<span class="k">return</span> <span class="n">match_result</span><span class="o">.</span><span class="n">metadata_list</span>
<div class="viewcode-block" id="MatchFiles"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.MatchFiles">[docs]</a><span class="k">class</span> <span class="nc">MatchFiles</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Matches a file pattern using ``FileSystems.match``.</span>
<span class="sd"> This ``PTransform`` returns a ``PCollection`` of matching files in the form</span>
<span class="sd"> of ``FileMetadata`` objects.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">file_pattern</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">empty_match_treatment</span><span class="o">=</span><span class="n">EmptyMatchTreatment</span><span class="o">.</span><span class="n">ALLOW_IF_WILDCARD</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_file_pattern</span> <span class="o">=</span> <span class="n">file_pattern</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_empty_match_treatment</span> <span class="o">=</span> <span class="n">empty_match_treatment</span>
<div class="viewcode-block" id="MatchFiles.expand"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.MatchFiles.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">_file_pattern</span><span class="p">])</span> <span class="o">|</span> <span class="n">MatchAll</span><span class="p">()</span></div></div>
<div class="viewcode-block" id="MatchAll"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.MatchAll">[docs]</a><span class="k">class</span> <span class="nc">MatchAll</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Matches file patterns from the input PCollection via ``FileSystems.match``.</span>
<span class="sd"> This ``PTransform`` returns a ``PCollection`` of matching files in the form</span>
<span class="sd"> of ``FileMetadata`` objects.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">empty_match_treatment</span><span class="o">=</span><span class="n">EmptyMatchTreatment</span><span class="o">.</span><span class="n">ALLOW</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_empty_match_treatment</span> <span class="o">=</span> <span class="n">empty_match_treatment</span>
<div class="viewcode-block" id="MatchAll.expand"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.MatchAll.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">pcoll</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">,</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_MatchAllFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_empty_match_treatment</span><span class="p">))</span></div></div>
<div class="viewcode-block" id="ReadableFile"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.ReadableFile">[docs]</a><span class="k">class</span> <span class="nc">ReadableFile</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A utility class for accessing files.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">metadata</span><span class="p">,</span> <span class="n">compression</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">metadata</span> <span class="o">=</span> <span class="n">metadata</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_compression</span> <span class="o">=</span> <span class="n">compression</span>
<div class="viewcode-block" id="ReadableFile.open"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.ReadableFile.open">[docs]</a> <span class="k">def</span> <span class="nf">open</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">mime_type</span><span class="o">=</span><span class="s1">&#39;text/plain&#39;</span><span class="p">,</span> <span class="n">compression_type</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">compression</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">compression_type</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression</span> <span class="ow">or</span>
<span class="n">filesystems</span><span class="o">.</span><span class="n">CompressionTypes</span><span class="o">.</span><span class="n">AUTO</span><span class="p">)</span>
<span class="k">return</span> <span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">open</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="n">mime_type</span><span class="o">=</span><span class="n">mime_type</span><span class="p">,</span> <span class="n">compression_type</span><span class="o">=</span><span class="n">compression</span><span class="p">)</span></div>
<div class="viewcode-block" id="ReadableFile.read"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.ReadableFile.read">[docs]</a> <span class="k">def</span> <span class="nf">read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">mime_type</span><span class="o">=</span><span class="s1">&#39;application/octet-stream&#39;</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">mime_type</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">()</span></div>
<div class="viewcode-block" id="ReadableFile.read_utf8"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.ReadableFile.read_utf8">[docs]</a> <span class="k">def</span> <span class="nf">read_utf8</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">open</span><span class="p">()</span><span class="o">.</span><span class="n">read</span><span class="p">()</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">)</span></div></div>
<span class="k">class</span> <span class="nc">_ReadMatchesFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">compression</span><span class="p">,</span> <span class="n">skip_directories</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_compression</span> <span class="o">=</span> <span class="n">compression</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_skip_directories</span> <span class="o">=</span> <span class="n">skip_directories</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">file_metadata</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">],</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">ReadableFile</span><span class="p">]:</span>
<span class="n">metadata</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">(</span><span class="n">file_metadata</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span>
<span class="n">file_metadata</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="k">else</span> <span class="n">file_metadata</span><span class="p">)</span>
<span class="k">if</span> <span class="p">((</span><span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">&#39;/&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">&#39;</span><span class="se">\\</span><span class="s1">&#39;</span><span class="p">))</span> <span class="ow">and</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_skip_directories</span><span class="p">):</span>
<span class="k">return</span>
<span class="k">elif</span> <span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">&#39;/&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">&#39;</span><span class="se">\\</span><span class="s1">&#39;</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">BeamIOError</span><span class="p">(</span>
<span class="s1">&#39;Directories are not allowed in ReadMatches transform.&#39;</span>
<span class="s1">&#39;Found </span><span class="si">%s</span><span class="s1">.&#39;</span> <span class="o">%</span> <span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="p">)</span>
<span class="c1"># TODO: Mime type? Other arguments? Maybe arguments passed in to transform?</span>
<span class="k">yield</span> <span class="n">ReadableFile</span><span class="p">(</span><span class="n">metadata</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression</span><span class="p">)</span>
<div class="viewcode-block" id="MatchContinuously"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.MatchContinuously">[docs]</a><span class="nd">@experimental</span><span class="p">()</span>
<span class="k">class</span> <span class="nc">MatchContinuously</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Checks for new files for a given pattern every interval.</span>
<span class="sd"> This ``PTransform`` returns a ``PCollection`` of matching files in the form</span>
<span class="sd"> of ``FileMetadata`` objects.</span>
<span class="sd"> MatchContinuously is experimental. No backwards-compatibility</span>
<span class="sd"> guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">file_pattern</span><span class="p">,</span>
<span class="n">interval</span><span class="o">=</span><span class="mf">360.0</span><span class="p">,</span>
<span class="n">has_deduplication</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">start_timestamp</span><span class="o">=</span><span class="n">Timestamp</span><span class="o">.</span><span class="n">now</span><span class="p">(),</span>
<span class="n">stop_timestamp</span><span class="o">=</span><span class="n">MAX_TIMESTAMP</span><span class="p">,</span>
<span class="n">match_updated_files</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">apply_windowing</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">empty_match_treatment</span><span class="o">=</span><span class="n">EmptyMatchTreatment</span><span class="o">.</span><span class="n">ALLOW</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initializes a MatchContinuously transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> file_pattern: The file path to read from.</span>
<span class="sd"> interval: Interval at which to check for files in seconds.</span>
<span class="sd"> has_deduplication: Whether files already read are discarded or not.</span>
<span class="sd"> start_timestamp: Timestamp for start file checking.</span>
<span class="sd"> stop_timestamp: Timestamp after which no more files will be checked.</span>
<span class="sd"> match_updated_files: (When has_deduplication is set to True) whether match</span>
<span class="sd"> file with timestamp changes.</span>
<span class="sd"> apply_windowing: Whether each element should be assigned to</span>
<span class="sd"> individual window. If false, all elements will reside in global window.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_pattern</span> <span class="o">=</span> <span class="n">file_pattern</span>
<span class="bp">self</span><span class="o">.</span><span class="n">interval</span> <span class="o">=</span> <span class="n">interval</span>
<span class="bp">self</span><span class="o">.</span><span class="n">has_deduplication</span> <span class="o">=</span> <span class="n">has_deduplication</span>
<span class="bp">self</span><span class="o">.</span><span class="n">start_ts</span> <span class="o">=</span> <span class="n">start_timestamp</span>
<span class="bp">self</span><span class="o">.</span><span class="n">stop_ts</span> <span class="o">=</span> <span class="n">stop_timestamp</span>
<span class="bp">self</span><span class="o">.</span><span class="n">match_upd</span> <span class="o">=</span> <span class="n">match_updated_files</span>
<span class="bp">self</span><span class="o">.</span><span class="n">apply_windowing</span> <span class="o">=</span> <span class="n">apply_windowing</span>
<span class="bp">self</span><span class="o">.</span><span class="n">empty_match_treatment</span> <span class="o">=</span> <span class="n">empty_match_treatment</span>
<div class="viewcode-block" id="MatchContinuously.expand"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.MatchContinuously.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pbegin</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">]:</span>
<span class="c1"># invoke periodic impulse</span>
<span class="n">impulse</span> <span class="o">=</span> <span class="n">pbegin</span> <span class="o">|</span> <span class="n">PeriodicImpulse</span><span class="p">(</span>
<span class="n">start_timestamp</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">start_ts</span><span class="p">,</span>
<span class="n">stop_timestamp</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">stop_ts</span><span class="p">,</span>
<span class="n">fire_interval</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">interval</span><span class="p">)</span>
<span class="c1"># match file pattern periodically</span>
<span class="n">match_files</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">impulse</span>
<span class="o">|</span> <span class="s1">&#39;GetFilePattern&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_pattern</span><span class="p">)</span>
<span class="o">|</span> <span class="n">MatchAll</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">empty_match_treatment</span><span class="p">))</span>
<span class="c1"># apply deduplication strategy if required</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_deduplication</span><span class="p">:</span>
<span class="c1"># Making a Key Value so each file has its own state.</span>
<span class="n">match_files</span> <span class="o">=</span> <span class="n">match_files</span> <span class="o">|</span> <span class="s1">&#39;ToKV&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="n">x</span><span class="p">))</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">match_upd</span><span class="p">:</span>
<span class="n">match_files</span> <span class="o">=</span> <span class="n">match_files</span> <span class="o">|</span> <span class="s1">&#39;RemoveOldAlreadyRead&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_RemoveOldDuplicates</span><span class="p">())</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">match_files</span> <span class="o">=</span> <span class="n">match_files</span> <span class="o">|</span> <span class="s1">&#39;RemoveAlreadyRead&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_RemoveDuplicates</span><span class="p">())</span>
<span class="c1"># apply windowing if required. Apply at last because deduplication relies on</span>
<span class="c1"># the global window.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">apply_windowing</span><span class="p">:</span>
<span class="n">match_files</span> <span class="o">=</span> <span class="n">match_files</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">FixedWindows</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">interval</span><span class="p">))</span>
<span class="k">return</span> <span class="n">match_files</span></div></div>
<div class="viewcode-block" id="ReadMatches"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.ReadMatches">[docs]</a><span class="k">class</span> <span class="nc">ReadMatches</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Converts each result of MatchFiles() or MatchAll() to a ReadableFile.</span>
<span class="sd"> This helps read in a file&#39;s contents or obtain a file descriptor.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">compression</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">skip_directories</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_compression</span> <span class="o">=</span> <span class="n">compression</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_skip_directories</span> <span class="o">=</span> <span class="n">skip_directories</span>
<div class="viewcode-block" id="ReadMatches.expand"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.ReadMatches.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">pcoll</span><span class="p">:</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">]],</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">PCollection</span><span class="p">[</span><span class="n">ReadableFile</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_ReadMatchesFn</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_compression</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_skip_directories</span><span class="p">))</span></div></div>
<span class="k">class</span> <span class="nc">FileSink</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Specifies how to write elements to individual files in ``WriteToFiles``.</span>
<span class="sd"> A Sink class must implement the following:</span>
<span class="sd"> - The ``open`` method, which initializes writing to a file handler (it is not</span>
<span class="sd"> responsible for opening the file handler itself).</span>
<span class="sd"> - The ``write`` method, which writes an element to the file that was passed</span>
<span class="sd"> in ``open``.</span>
<span class="sd"> - The ``flush`` method, which flushes any buffered state. This is most often</span>
<span class="sd"> called before closing a file (but not exclusively called in that</span>
<span class="sd"> situation). The sink is not responsible for closing the file handler.</span>
<span class="sd"> A Sink class can override the following:</span>
<span class="sd"> - The ``create_metadata`` method, which creates all metadata passed to</span>
<span class="sd"> Filesystems.create.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">create_metadata</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">destination</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">full_file_name</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">FileMetadata</span><span class="p">:</span>
<span class="k">return</span> <span class="n">FileMetadata</span><span class="p">(</span>
<span class="n">mime_type</span><span class="o">=</span><span class="s2">&quot;application/octet-stream&quot;</span><span class="p">,</span>
<span class="n">compression_type</span><span class="o">=</span><span class="n">CompressionTypes</span><span class="o">.</span><span class="n">AUTO</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">open</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fh</span><span class="p">):</span>
<span class="c1"># type: (BinaryIO) -&gt; None</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span>
<span class="k">def</span> <span class="nf">write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">record</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span>
<span class="k">def</span> <span class="nf">flush</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span>
<span class="nd">@beam</span><span class="o">.</span><span class="n">typehints</span><span class="o">.</span><span class="n">with_input_types</span><span class="p">(</span><span class="nb">str</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">TextSink</span><span class="p">(</span><span class="n">FileSink</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A sink that encodes utf8 elements, and writes to file handlers.</span>
<span class="sd"> This sink simply calls file_handler.write(record.encode(&#39;utf8&#39;) + &#39;\n&#39;) on all</span>
<span class="sd"> records that come into it.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">open</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">fh</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fh</span> <span class="o">=</span> <span class="n">fh</span>
<span class="k">def</span> <span class="nf">write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">record</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fh</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">record</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf8&#39;</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fh</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="sa">b</span><span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">flush</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_fh</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">prefix_naming</span><span class="p">(</span><span class="n">prefix</span><span class="p">):</span>
<span class="k">return</span> <span class="n">default_file_naming</span><span class="p">(</span><span class="n">prefix</span><span class="p">)</span>
<span class="n">_DEFAULT_FILE_NAME_TEMPLATE</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;</span><span class="si">{prefix}</span><span class="s1">-</span><span class="si">{start}</span><span class="s1">-</span><span class="si">{end}</span><span class="s1">-</span><span class="si">{pane}</span><span class="s1">-&#39;</span>
<span class="s1">&#39;</span><span class="si">{shard:05d}</span><span class="s1">-of-</span><span class="si">{total_shards:05d}</span><span class="s1">&#39;</span>
<span class="s1">&#39;</span><span class="si">{suffix}{compression}</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_format_shard</span><span class="p">(</span>
<span class="n">window</span><span class="p">,</span> <span class="n">pane</span><span class="p">,</span> <span class="n">shard_index</span><span class="p">,</span> <span class="n">total_shards</span><span class="p">,</span> <span class="n">compression</span><span class="p">,</span> <span class="n">prefix</span><span class="p">,</span> <span class="n">suffix</span><span class="p">):</span>
<span class="n">kwargs</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">&#39;prefix&#39;</span><span class="p">:</span> <span class="n">prefix</span><span class="p">,</span>
<span class="s1">&#39;start&#39;</span><span class="p">:</span> <span class="s1">&#39;&#39;</span><span class="p">,</span>
<span class="s1">&#39;end&#39;</span><span class="p">:</span> <span class="s1">&#39;&#39;</span><span class="p">,</span>
<span class="s1">&#39;pane&#39;</span><span class="p">:</span> <span class="s1">&#39;&#39;</span><span class="p">,</span>
<span class="s1">&#39;shard&#39;</span><span class="p">:</span> <span class="mi">0</span><span class="p">,</span>
<span class="s1">&#39;total_shards&#39;</span><span class="p">:</span> <span class="mi">0</span><span class="p">,</span>
<span class="s1">&#39;suffix&#39;</span><span class="p">:</span> <span class="s1">&#39;&#39;</span><span class="p">,</span>
<span class="s1">&#39;compression&#39;</span><span class="p">:</span> <span class="s1">&#39;&#39;</span>
<span class="p">}</span>
<span class="k">if</span> <span class="n">total_shards</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">shard_index</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;shard&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">shard_index</span><span class="p">)</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;total_shards&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">total_shards</span><span class="p">)</span>
<span class="k">if</span> <span class="n">window</span> <span class="o">!=</span> <span class="n">GlobalWindow</span><span class="p">():</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;start&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">window</span><span class="o">.</span><span class="n">start</span><span class="o">.</span><span class="n">to_utc_datetime</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;end&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">window</span><span class="o">.</span><span class="n">end</span><span class="o">.</span><span class="n">to_utc_datetime</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
<span class="c1"># TODO(https://github.com/apache/beam/issues/18721): Add support for PaneInfo</span>
<span class="c1"># If the PANE is the ONLY firing in the window, we don&#39;t add it.</span>
<span class="c1">#if pane and not (pane.is_first and pane.is_last):</span>
<span class="c1"># kwargs[&#39;pane&#39;] = pane.index</span>
<span class="k">if</span> <span class="n">suffix</span><span class="p">:</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;suffix&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">suffix</span>
<span class="k">if</span> <span class="n">compression</span><span class="p">:</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;compression&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="s1">&#39;.</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">compression</span>
<span class="c1"># Remove separators for unused template parts.</span>
<span class="nb">format</span> <span class="o">=</span> <span class="n">_DEFAULT_FILE_NAME_TEMPLATE</span>
<span class="k">if</span> <span class="n">shard_index</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="nb">format</span> <span class="o">=</span> <span class="nb">format</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;-</span><span class="si">{shard:05d}</span><span class="s1">&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">total_shards</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="nb">format</span> <span class="o">=</span> <span class="nb">format</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;-of-</span><span class="si">{total_shards:05d}</span><span class="s1">&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="k">if</span> <span class="n">value</span> <span class="ow">in</span> <span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">):</span>
<span class="nb">format</span> <span class="o">=</span> <span class="nb">format</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;-{</span><span class="si">%s</span><span class="s1">}&#39;</span> <span class="o">%</span> <span class="n">name</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="nb">format</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="n">FileNaming</span> <span class="o">=</span> <span class="n">Callable</span><span class="p">[[</span><span class="n">Any</span><span class="p">,</span> <span class="n">Any</span><span class="p">,</span> <span class="nb">int</span><span class="p">,</span> <span class="nb">int</span><span class="p">,</span> <span class="n">Any</span><span class="p">,</span> <span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">],</span> <span class="nb">str</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">destination_prefix_naming</span><span class="p">(</span><span class="n">suffix</span><span class="o">=</span><span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">FileNaming</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">_inner</span><span class="p">(</span><span class="n">window</span><span class="p">,</span> <span class="n">pane</span><span class="p">,</span> <span class="n">shard_index</span><span class="p">,</span> <span class="n">total_shards</span><span class="p">,</span> <span class="n">compression</span><span class="p">,</span> <span class="n">destination</span><span class="p">):</span>
<span class="n">prefix</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_format_shard</span><span class="p">(</span>
<span class="n">window</span><span class="p">,</span> <span class="n">pane</span><span class="p">,</span> <span class="n">shard_index</span><span class="p">,</span> <span class="n">total_shards</span><span class="p">,</span> <span class="n">compression</span><span class="p">,</span> <span class="n">prefix</span><span class="p">,</span> <span class="n">suffix</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_inner</span>
<span class="k">def</span> <span class="nf">default_file_naming</span><span class="p">(</span><span class="n">prefix</span><span class="p">,</span> <span class="n">suffix</span><span class="o">=</span><span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">FileNaming</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">_inner</span><span class="p">(</span><span class="n">window</span><span class="p">,</span> <span class="n">pane</span><span class="p">,</span> <span class="n">shard_index</span><span class="p">,</span> <span class="n">total_shards</span><span class="p">,</span> <span class="n">compression</span><span class="p">,</span> <span class="n">destination</span><span class="p">):</span>
<span class="k">return</span> <span class="n">_format_shard</span><span class="p">(</span>
<span class="n">window</span><span class="p">,</span> <span class="n">pane</span><span class="p">,</span> <span class="n">shard_index</span><span class="p">,</span> <span class="n">total_shards</span><span class="p">,</span> <span class="n">compression</span><span class="p">,</span> <span class="n">prefix</span><span class="p">,</span> <span class="n">suffix</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_inner</span>
<span class="k">def</span> <span class="nf">single_file_naming</span><span class="p">(</span><span class="n">prefix</span><span class="p">,</span> <span class="n">suffix</span><span class="o">=</span><span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">FileNaming</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">_inner</span><span class="p">(</span><span class="n">window</span><span class="p">,</span> <span class="n">pane</span><span class="p">,</span> <span class="n">shard_index</span><span class="p">,</span> <span class="n">total_shards</span><span class="p">,</span> <span class="n">compression</span><span class="p">,</span> <span class="n">destination</span><span class="p">):</span>
<span class="k">assert</span> <span class="n">shard_index</span> <span class="ow">in</span> <span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> <span class="n">shard_index</span>
<span class="k">assert</span> <span class="n">total_shards</span> <span class="ow">in</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> <span class="n">total_shards</span>
<span class="k">return</span> <span class="n">_format_shard</span><span class="p">(</span><span class="n">window</span><span class="p">,</span> <span class="n">pane</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="n">compression</span><span class="p">,</span> <span class="n">prefix</span><span class="p">,</span> <span class="n">suffix</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_inner</span>
<span class="n">_FileResult</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">namedtuple</span><span class="p">(</span>
<span class="s1">&#39;FileResult&#39;</span><span class="p">,</span> <span class="p">[</span>
<span class="s1">&#39;file_name&#39;</span><span class="p">,</span>
<span class="s1">&#39;shard_index&#39;</span><span class="p">,</span>
<span class="s1">&#39;total_shards&#39;</span><span class="p">,</span>
<span class="s1">&#39;window&#39;</span><span class="p">,</span>
<span class="s1">&#39;pane&#39;</span><span class="p">,</span>
<span class="s1">&#39;destination&#39;</span>
<span class="p">])</span>
<span class="c1"># Adding a class to contain PyDoc.</span>
<span class="k">class</span> <span class="nc">FileResult</span><span class="p">(</span><span class="n">_FileResult</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;A descriptor of a file that has been written.&quot;&quot;&quot;</span>
<span class="k">pass</span>
<div class="viewcode-block" id="WriteToFiles"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.WriteToFiles">[docs]</a><span class="nd">@experimental</span><span class="p">()</span>
<span class="k">class</span> <span class="nc">WriteToFiles</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sa">r</span><span class="sd">&quot;&quot;&quot;Write the incoming PCollection to a set of output files.</span>
<span class="sd"> The incoming ``PCollection`` may be bounded or unbounded.</span>
<span class="sd"> **Note:** For unbounded ``PCollection``\s, this transform does not support</span>
<span class="sd"> multiple firings per Window (due to the fact that files are named only by</span>
<span class="sd"> their destination, and window, at the moment).</span>
<span class="sd"> WriteToFiles is experimental. No backwards-compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># We allow up to 20 different destinations to be written in a single bundle.</span>
<span class="c1"># Too many files will add memory pressure to the worker, so we let it be 20.</span>
<span class="n">MAX_NUM_WRITERS_PER_BUNDLE</span> <span class="o">=</span> <span class="mi">20</span>
<span class="n">DEFAULT_SHARDING</span> <span class="o">=</span> <span class="mi">5</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">path</span><span class="p">,</span>
<span class="n">file_naming</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">destination</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">temp_directory</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">sink</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">shards</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">output_fn</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_writers_per_bundle</span><span class="o">=</span><span class="n">MAX_NUM_WRITERS_PER_BUNDLE</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initializes a WriteToFiles transform.</span>
<span class="sd"> Args:</span>
<span class="sd"> path (str, ValueProvider): The directory to write files into.</span>
<span class="sd"> file_naming (callable): A callable that takes in a window, pane,</span>
<span class="sd"> shard_index, total_shards and compression; and returns a file name.</span>
<span class="sd"> destination (callable): If this argument is provided, the sink parameter</span>
<span class="sd"> must also be a callable.</span>
<span class="sd"> temp_directory (str, ValueProvider): To ensure atomicity in the transform,</span>
<span class="sd"> the output is written into temporary files, which are written to a</span>
<span class="sd"> directory that is meant to be temporary as well. Once the whole output</span>
<span class="sd"> has been written, the files are moved into their final destination, and</span>
<span class="sd"> given their final names. By default, the temporary directory will be</span>
<span class="sd"> within the temp_location of your pipeline.</span>
<span class="sd"> sink (callable, ~apache_beam.io.fileio.FileSink): The sink to use to write</span>
<span class="sd"> into a file. It should implement the methods of a ``FileSink``. Pass a</span>
<span class="sd"> class signature or an instance of FileSink to this parameter. If none is</span>
<span class="sd"> provided, a ``TextSink`` is used.</span>
<span class="sd"> shards (int): The number of shards per destination and trigger firing.</span>
<span class="sd"> max_writers_per_bundle (int): The number of writers that can be open</span>
<span class="sd"> concurrently in a single worker that&#39;s processing one bundle.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">path</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="n">ValueProvider</span><span class="p">)</span> <span class="k">else</span> <span class="n">StaticValueProvider</span><span class="p">(</span>
<span class="nb">str</span><span class="p">,</span> <span class="n">path</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_naming_fn</span> <span class="o">=</span> <span class="n">file_naming</span> <span class="ow">or</span> <span class="n">default_file_naming</span><span class="p">(</span><span class="s1">&#39;output&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">destination_fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_destination_fn</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_temp_directory</span> <span class="o">=</span> <span class="n">temp_directory</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink_fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_sink_fn</span><span class="p">(</span><span class="n">sink</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">shards</span> <span class="o">=</span> <span class="n">shards</span> <span class="ow">or</span> <span class="n">WriteToFiles</span><span class="o">.</span><span class="n">DEFAULT_SHARDING</span>
<span class="bp">self</span><span class="o">.</span><span class="n">output_fn</span> <span class="o">=</span> <span class="n">output_fn</span> <span class="ow">or</span> <span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_max_num_writers_per_bundle</span> <span class="o">=</span> <span class="n">max_writers_per_bundle</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_get_sink_fn</span><span class="p">(</span><span class="n">input_sink</span><span class="p">):</span>
<span class="c1"># type: (...) -&gt; Callable[[Any], FileSink]</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">input_sink</span><span class="p">,</span> <span class="nb">type</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">issubclass</span><span class="p">(</span><span class="n">input_sink</span><span class="p">,</span> <span class="n">FileSink</span><span class="p">):</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">input_sink</span><span class="p">()</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">input_sink</span><span class="p">,</span> <span class="n">FileSink</span><span class="p">):</span>
<span class="n">kls</span> <span class="o">=</span> <span class="n">input_sink</span><span class="o">.</span><span class="vm">__class__</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">kls</span><span class="p">()</span>
<span class="k">elif</span> <span class="nb">callable</span><span class="p">(</span><span class="n">input_sink</span><span class="p">):</span>
<span class="k">return</span> <span class="n">input_sink</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">TextSink</span><span class="p">()</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_get_destination_fn</span><span class="p">(</span><span class="n">destination</span><span class="p">):</span>
<span class="c1"># type: (...) -&gt; Callable[[Any], str]</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">ValueProvider</span><span class="p">):</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">elm</span><span class="p">:</span> <span class="n">destination</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="k">elif</span> <span class="nb">callable</span><span class="p">(</span><span class="n">destination</span><span class="p">):</span>
<span class="k">return</span> <span class="n">destination</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="k">lambda</span> <span class="n">elm</span><span class="p">:</span> <span class="n">destination</span>
<div class="viewcode-block" id="WriteToFiles.expand"><a class="viewcode-back" href="../../../apache_beam.io.fileio.html#apache_beam.io.fileio.WriteToFiles.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="n">p</span> <span class="o">=</span> <span class="n">pcoll</span><span class="o">.</span><span class="n">pipeline</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_temp_directory</span><span class="p">:</span>
<span class="n">temp_location</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">p</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">temp_location</span> <span class="ow">or</span>
<span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">get</span><span class="p">())</span>
<span class="n">dir_uid</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_temp_directory</span> <span class="o">=</span> <span class="n">StaticValueProvider</span><span class="p">(</span>
<span class="nb">str</span><span class="p">,</span> <span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">temp_location</span><span class="p">,</span> <span class="s1">&#39;.temp</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">dir_uid</span><span class="p">))</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Added temporary directory </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_temp_directory</span><span class="o">.</span><span class="n">get</span><span class="p">())</span>
<span class="n">output</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">pcoll</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_WriteUnshardedRecordsFn</span><span class="p">(</span>
<span class="n">base_path</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_temp_directory</span><span class="p">,</span>
<span class="n">destination_fn</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">destination_fn</span><span class="p">,</span>
<span class="n">sink_fn</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">sink_fn</span><span class="p">,</span>
<span class="n">max_writers_per_bundle</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_max_num_writers_per_bundle</span><span class="p">))</span><span class="o">.</span>
<span class="n">with_outputs</span><span class="p">(</span>
<span class="n">_WriteUnshardedRecordsFn</span><span class="o">.</span><span class="n">SPILLED_RECORDS</span><span class="p">,</span>
<span class="n">_WriteUnshardedRecordsFn</span><span class="o">.</span><span class="n">WRITTEN_FILES</span><span class="p">))</span>
<span class="n">written_files_pc</span> <span class="o">=</span> <span class="n">output</span><span class="p">[</span><span class="n">_WriteUnshardedRecordsFn</span><span class="o">.</span><span class="n">WRITTEN_FILES</span><span class="p">]</span>
<span class="n">spilled_records_pc</span> <span class="o">=</span> <span class="n">output</span><span class="p">[</span><span class="n">_WriteUnshardedRecordsFn</span><span class="o">.</span><span class="n">SPILLED_RECORDS</span><span class="p">]</span>
<span class="n">more_written_files_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">spilled_records_pc</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_AppendShardedDestination</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">destination_fn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">shards</span><span class="p">))</span>
<span class="o">|</span> <span class="s2">&quot;GroupRecordsByDestinationAndShard&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">()</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_WriteShardedRecordsFn</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_temp_directory</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink_fn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">shards</span><span class="p">)))</span>
<span class="n">files_by_destination_pc</span> <span class="o">=</span> <span class="p">(</span>
<span class="p">(</span><span class="n">written_files_pc</span><span class="p">,</span> <span class="n">more_written_files_pc</span><span class="p">)</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">()</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">file_result</span><span class="p">:</span> <span class="p">(</span><span class="n">file_result</span><span class="o">.</span><span class="n">destination</span><span class="p">,</span> <span class="n">file_result</span><span class="p">))</span>
<span class="o">|</span> <span class="s2">&quot;GroupTempFilesByDestination&quot;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">GroupByKey</span><span class="p">())</span>
<span class="c1"># Now we should take the temporary files, and write them to the final</span>
<span class="c1"># destination, with their proper names.</span>
<span class="n">file_results</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">files_by_destination_pc</span>
<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_MoveTempFilesIntoFinalDestinationFn</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_naming_fn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_temp_directory</span><span class="p">)))</span>
<span class="k">return</span> <span class="n">file_results</span></div></div>
<span class="k">def</span> <span class="nf">_create_writer</span><span class="p">(</span>
<span class="n">base_path</span><span class="p">,</span>
<span class="n">writer_key</span><span class="p">:</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">IntervalWindow</span><span class="p">],</span>
<span class="n">create_metadata_fn</span><span class="p">:</span> <span class="n">CreateFileMetadataFn</span><span class="p">,</span>
<span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">mkdirs</span><span class="p">(</span><span class="n">base_path</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">IOError</span><span class="p">:</span>
<span class="c1"># Directory already exists.</span>
<span class="k">pass</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">writer_key</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="c1"># The file name has a prefix determined by destination+window, along with</span>
<span class="c1"># a random string. This allows us to retrieve orphaned files later on.</span>
<span class="n">file_name</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">_</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="nb">abs</span><span class="p">(</span><span class="nb">hash</span><span class="p">(</span><span class="n">writer_key</span><span class="p">)),</span> <span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span>
<span class="n">full_file_name</span> <span class="o">=</span> <span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">base_path</span><span class="p">,</span> <span class="n">file_name</span><span class="p">)</span>
<span class="n">metadata</span> <span class="o">=</span> <span class="n">create_metadata_fn</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">full_file_name</span><span class="p">)</span>
<span class="k">return</span> <span class="n">full_file_name</span><span class="p">,</span> <span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">create</span><span class="p">(</span>
<span class="n">full_file_name</span><span class="p">,</span>
<span class="o">**</span><span class="n">metadata</span><span class="o">.</span><span class="n">_asdict</span><span class="p">())</span>
<span class="k">class</span> <span class="nc">_MoveTempFilesIntoFinalDestinationFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">file_naming_fn</span><span class="p">,</span> <span class="n">temp_dir</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="n">path</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_naming_fn</span> <span class="o">=</span> <span class="n">file_naming_fn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">temporary_directory</span> <span class="o">=</span> <span class="n">temp_dir</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">w</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">WindowParam</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="c1"># list of FileResult objects for temp files</span>
<span class="n">temp_file_results</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span>
<span class="c1"># list of FileResult objects for final files</span>
<span class="n">final_file_results</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">i</span><span class="p">,</span> <span class="n">r</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">temp_file_results</span><span class="p">):</span>
<span class="c1"># TODO(pabloem): Handle compression for files.</span>
<span class="n">final_file_name</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_naming_fn</span><span class="p">(</span>
<span class="n">r</span><span class="o">.</span><span class="n">window</span><span class="p">,</span> <span class="n">r</span><span class="o">.</span><span class="n">pane</span><span class="p">,</span> <span class="n">i</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">temp_file_results</span><span class="p">),</span> <span class="s1">&#39;&#39;</span><span class="p">,</span> <span class="n">destination</span><span class="p">)</span>
<span class="n">final_file_results</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="n">FileResult</span><span class="p">(</span>
<span class="n">final_file_name</span><span class="p">,</span>
<span class="n">i</span><span class="p">,</span>
<span class="nb">len</span><span class="p">(</span><span class="n">temp_file_results</span><span class="p">),</span>
<span class="n">r</span><span class="o">.</span><span class="n">window</span><span class="p">,</span>
<span class="n">r</span><span class="o">.</span><span class="n">pane</span><span class="p">,</span>
<span class="n">destination</span><span class="p">))</span>
<span class="n">move_from</span> <span class="o">=</span> <span class="p">[</span><span class="n">f</span><span class="o">.</span><span class="n">file_name</span> <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">temp_file_results</span><span class="p">]</span>
<span class="n">move_to</span> <span class="o">=</span> <span class="p">[</span><span class="n">f</span><span class="o">.</span><span class="n">file_name</span> <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">final_file_results</span><span class="p">]</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Moving temporary files </span><span class="si">%s</span><span class="s1"> to dir: </span><span class="si">%s</span><span class="s1"> as </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="nb">map</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">basename</span><span class="p">,</span> <span class="n">move_from</span><span class="p">),</span>
<span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">get</span><span class="p">(),</span>
<span class="n">move_to</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">rename</span><span class="p">(</span>
<span class="n">move_from</span><span class="p">,</span>
<span class="p">[</span><span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">get</span><span class="p">(),</span> <span class="n">f</span><span class="p">)</span> <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">move_to</span><span class="p">])</span>
<span class="k">except</span> <span class="n">BeamIOError</span><span class="p">:</span>
<span class="c1"># This error is not serious, because it may happen on a retry of the</span>
<span class="c1"># bundle. We simply log it.</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span>
<span class="s1">&#39;Exception occurred during moving files: </span><span class="si">%s</span><span class="s1">. This may be due to a&#39;</span>
<span class="s1">&#39; bundle being retried.&#39;</span><span class="p">,</span>
<span class="n">move_from</span><span class="p">)</span>
<span class="k">yield from</span> <span class="n">final_file_results</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span>
<span class="s1">&#39;Checking orphaned temporary files for destination </span><span class="si">%s</span><span class="s1"> and window </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">destination</span><span class="p">,</span>
<span class="n">w</span><span class="p">)</span>
<span class="n">writer_key</span> <span class="o">=</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">w</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_check_orphaned_files</span><span class="p">(</span><span class="n">writer_key</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_check_orphaned_files</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">writer_key</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">prefix</span> <span class="o">=</span> <span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">join</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">temporary_directory</span><span class="o">.</span><span class="n">get</span><span class="p">(),</span> <span class="nb">str</span><span class="p">(</span><span class="nb">abs</span><span class="p">(</span><span class="nb">hash</span><span class="p">(</span><span class="n">writer_key</span><span class="p">))))</span>
<span class="n">match_result</span> <span class="o">=</span> <span class="n">filesystems</span><span class="o">.</span><span class="n">FileSystems</span><span class="o">.</span><span class="n">match</span><span class="p">([</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1">*&#39;</span> <span class="o">%</span> <span class="n">prefix</span><span class="p">])</span>
<span class="n">orphaned_files</span> <span class="o">=</span> <span class="p">[</span><span class="n">m</span><span class="o">.</span><span class="n">path</span> <span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="n">match_result</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">metadata_list</span><span class="p">]</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">orphaned_files</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Some files may be left orphaned in the temporary folder: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">orphaned_files</span><span class="p">)</span>
<span class="k">except</span> <span class="n">BeamIOError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Exceptions when checking orphaned files: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">e</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_WriteShardedRecordsFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">base_path</span><span class="p">,</span>
<span class="n">sink_fn</span><span class="p">,</span> <span class="c1"># type: Callable[[Any], FileSink]</span>
<span class="n">shards</span> <span class="c1"># type: int</span>
<span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">base_path</span> <span class="o">=</span> <span class="n">base_path</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink_fn</span> <span class="o">=</span> <span class="n">sink_fn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">shards</span> <span class="o">=</span> <span class="n">shards</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="n">w</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">WindowParam</span><span class="p">,</span> <span class="n">pane</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">PaneInfoParam</span><span class="p">):</span>
<span class="n">destination_and_shard</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">destination</span> <span class="o">=</span> <span class="n">destination_and_shard</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">shard</span> <span class="o">=</span> <span class="n">destination_and_shard</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">records</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">sink</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink_fn</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="n">full_file_name</span><span class="p">,</span> <span class="n">writer</span> <span class="o">=</span> <span class="n">_create_writer</span><span class="p">(</span>
<span class="n">base_path</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">base_path</span><span class="o">.</span><span class="n">get</span><span class="p">(),</span>
<span class="n">writer_key</span><span class="o">=</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">w</span><span class="p">),</span>
<span class="n">create_metadata_fn</span><span class="o">=</span><span class="n">sink</span><span class="o">.</span><span class="n">create_metadata</span><span class="p">)</span>
<span class="n">sink</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">writer</span><span class="p">)</span>
<span class="k">for</span> <span class="n">r</span> <span class="ow">in</span> <span class="n">records</span><span class="p">:</span>
<span class="n">sink</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">r</span><span class="p">)</span>
<span class="n">sink</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Writing file </span><span class="si">%s</span><span class="s1"> for destination </span><span class="si">%s</span><span class="s1"> and shard </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">full_file_name</span><span class="p">,</span>
<span class="n">destination</span><span class="p">,</span>
<span class="nb">repr</span><span class="p">(</span><span class="n">shard</span><span class="p">))</span>
<span class="k">yield</span> <span class="n">FileResult</span><span class="p">(</span>
<span class="n">full_file_name</span><span class="p">,</span>
<span class="n">shard_index</span><span class="o">=</span><span class="n">shard</span><span class="p">,</span>
<span class="n">total_shards</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">shards</span><span class="p">,</span>
<span class="n">window</span><span class="o">=</span><span class="n">w</span><span class="p">,</span>
<span class="n">pane</span><span class="o">=</span><span class="n">pane</span><span class="p">,</span>
<span class="n">destination</span><span class="o">=</span><span class="n">destination</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_AppendShardedDestination</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">destination</span><span class="p">,</span> <span class="c1"># type: Callable[[Any], str]</span>
<span class="n">shards</span> <span class="c1"># type: int</span>
<span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">destination_fn</span> <span class="o">=</span> <span class="n">destination</span>
<span class="bp">self</span><span class="o">.</span><span class="n">shards</span> <span class="o">=</span> <span class="n">shards</span>
<span class="c1"># We start the shards for a single destination at an arbitrary point.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_shard_counter</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">defaultdict</span><span class="p">(</span>
<span class="k">lambda</span><span class="p">:</span> <span class="n">random</span><span class="o">.</span><span class="n">randrange</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">shards</span><span class="p">))</span> <span class="c1"># type: DefaultDict[str, int]</span>
<span class="k">def</span> <span class="nf">_next_shard_for_destination</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">destination</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_shard_counter</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span> <span class="o">=</span> <span class="p">((</span><span class="bp">self</span><span class="o">.</span><span class="n">_shard_counter</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span> <span class="o">%</span>
<span class="bp">self</span><span class="o">.</span><span class="n">shards</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_shard_counter</span><span class="p">[</span><span class="n">destination</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">record</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_fn</span><span class="p">(</span><span class="n">record</span><span class="p">)</span>
<span class="n">shard</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_next_shard_for_destination</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="k">yield</span> <span class="p">((</span><span class="n">destination</span><span class="p">,</span> <span class="n">shard</span><span class="p">),</span> <span class="n">record</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_WriteUnshardedRecordsFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="n">SPILLED_RECORDS</span> <span class="o">=</span> <span class="s1">&#39;spilled_records&#39;</span>
<span class="n">WRITTEN_FILES</span> <span class="o">=</span> <span class="s1">&#39;written_files&#39;</span>
<span class="n">_writers_and_sinks</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># type: Dict[Tuple[str, BoundedWindow], Tuple[BinaryIO, FileSink]]</span>
<span class="n">_file_names</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># type: Dict[Tuple[str, BoundedWindow], str]</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">base_path</span><span class="p">,</span>
<span class="n">destination_fn</span><span class="p">,</span>
<span class="n">sink_fn</span><span class="p">,</span>
<span class="n">max_writers_per_bundle</span><span class="o">=</span><span class="n">WriteToFiles</span><span class="o">.</span><span class="n">MAX_NUM_WRITERS_PER_BUNDLE</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">base_path</span> <span class="o">=</span> <span class="n">base_path</span>
<span class="bp">self</span><span class="o">.</span><span class="n">destination_fn</span> <span class="o">=</span> <span class="n">destination_fn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sink_fn</span> <span class="o">=</span> <span class="n">sink_fn</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_num_writers_per_bundle</span> <span class="o">=</span> <span class="n">max_writers_per_bundle</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_writers_and_sinks</span> <span class="o">=</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_file_names</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">record</span><span class="p">,</span> <span class="n">w</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">WindowParam</span><span class="p">,</span> <span class="n">pane</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">PaneInfoParam</span><span class="p">):</span>
<span class="n">destination</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_fn</span><span class="p">(</span><span class="n">record</span><span class="p">)</span>
<span class="n">writer</span><span class="p">,</span> <span class="n">sink</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_or_create_writer_and_sink</span><span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">w</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">writer</span><span class="p">:</span>
<span class="k">return</span> <span class="p">[</span><span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">SPILLED_RECORDS</span><span class="p">,</span> <span class="n">record</span><span class="p">)]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">sink</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">record</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_get_or_create_writer_and_sink</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">destination</span><span class="p">,</span> <span class="n">window</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a tuple of writer, sink.&quot;&quot;&quot;</span>
<span class="n">writer_key</span> <span class="o">=</span> <span class="p">(</span><span class="n">destination</span><span class="p">,</span> <span class="n">window</span><span class="p">)</span>
<span class="k">if</span> <span class="n">writer_key</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_writers_and_sinks</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_writers_and_sinks</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">writer_key</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_writers_and_sinks</span><span class="p">)</span> <span class="o">&gt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_num_writers_per_bundle</span><span class="p">:</span>
<span class="c1"># The writer does not exist, and we have too many writers already.</span>
<span class="k">return</span> <span class="kc">None</span><span class="p">,</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># The writer does not exist, but we can still create a new one.</span>
<span class="n">sink</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">sink_fn</span><span class="p">(</span><span class="n">destination</span><span class="p">)</span>
<span class="n">full_file_name</span><span class="p">,</span> <span class="n">writer</span> <span class="o">=</span> <span class="n">_create_writer</span><span class="p">(</span>
<span class="n">base_path</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">base_path</span><span class="o">.</span><span class="n">get</span><span class="p">(),</span>
<span class="n">writer_key</span><span class="o">=</span><span class="n">writer_key</span><span class="p">,</span>
<span class="n">create_metadata_fn</span><span class="o">=</span><span class="n">sink</span><span class="o">.</span><span class="n">create_metadata</span><span class="p">)</span>
<span class="n">sink</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">writer</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_writers_and_sinks</span><span class="p">[</span><span class="n">writer_key</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span><span class="n">writer</span><span class="p">,</span> <span class="n">sink</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_file_names</span><span class="p">[</span><span class="n">writer_key</span><span class="p">]</span> <span class="o">=</span> <span class="n">full_file_name</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_writers_and_sinks</span><span class="p">[</span><span class="n">writer_key</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="p">(</span><span class="n">writer</span><span class="p">,</span> <span class="n">sink</span><span class="p">)</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_writers_and_sinks</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="n">sink</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
<span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="n">file_result</span> <span class="o">=</span> <span class="n">FileResult</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_file_names</span><span class="p">[</span><span class="n">key</span><span class="p">],</span>
<span class="n">shard_index</span><span class="o">=-</span><span class="mi">1</span><span class="p">,</span>
<span class="n">total_shards</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">window</span><span class="o">=</span><span class="n">key</span><span class="p">[</span><span class="mi">1</span><span class="p">],</span>
<span class="n">pane</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># TODO(pabloem): get the pane info</span>
<span class="n">destination</span><span class="o">=</span><span class="n">key</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="k">yield</span> <span class="n">beam</span><span class="o">.</span><span class="n">pvalue</span><span class="o">.</span><span class="n">TaggedOutput</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">WRITTEN_FILES</span><span class="p">,</span>
<span class="n">beam</span><span class="o">.</span><span class="n">transforms</span><span class="o">.</span><span class="n">window</span><span class="o">.</span><span class="n">WindowedValue</span><span class="p">(</span>
<span class="n">file_result</span><span class="p">,</span>
<span class="n">timestamp</span><span class="o">=</span><span class="n">key</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">start</span><span class="p">,</span>
<span class="n">windows</span><span class="o">=</span><span class="p">[</span><span class="n">key</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span> <span class="c1"># TODO(pabloem) HOW DO WE GET THE PANE</span>
<span class="p">))</span>
<span class="k">class</span> <span class="nc">_RemoveDuplicates</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Internal DoFn that filters out filenames already seen (even though the file</span>
<span class="sd"> has updated).&quot;&quot;&quot;</span>
<span class="n">COUNT_STATE</span> <span class="o">=</span> <span class="n">CombiningValueStateSpec</span><span class="p">(</span><span class="s1">&#39;count&#39;</span><span class="p">,</span> <span class="n">combine_fn</span><span class="o">=</span><span class="nb">sum</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">element</span><span class="p">:</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">],</span>
<span class="n">count_state</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">StateParam</span><span class="p">(</span><span class="n">COUNT_STATE</span><span class="p">)</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">]:</span>
<span class="n">path</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">file_metadata</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">counter</span> <span class="o">=</span> <span class="n">count_state</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
<span class="k">if</span> <span class="n">counter</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="n">count_state</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;Generated entry for file </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">path</span><span class="p">)</span>
<span class="k">yield</span> <span class="n">file_metadata</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;File </span><span class="si">%s</span><span class="s1"> was already read, seen </span><span class="si">%d</span><span class="s1"> times&#39;</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">counter</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_RemoveOldDuplicates</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Internal DoFn that filters out filenames already seen and timestamp</span>
<span class="sd"> unchanged.&quot;&quot;&quot;</span>
<span class="n">TIME_STATE</span> <span class="o">=</span> <span class="n">CombiningValueStateSpec</span><span class="p">(</span>
<span class="s1">&#39;count&#39;</span><span class="p">,</span> <span class="n">combine_fn</span><span class="o">=</span><span class="n">partial</span><span class="p">(</span><span class="nb">max</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mf">0.0</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">element</span><span class="p">:</span> <span class="n">Tuple</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">],</span>
<span class="n">time_state</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">StateParam</span><span class="p">(</span><span class="n">TIME_STATE</span><span class="p">)</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Iterable</span><span class="p">[</span><span class="n">filesystem</span><span class="o">.</span><span class="n">FileMetadata</span><span class="p">]:</span>
<span class="n">path</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">file_metadata</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="n">new_ts</span> <span class="o">=</span> <span class="n">file_metadata</span><span class="o">.</span><span class="n">last_updated_in_seconds</span>
<span class="n">old_ts</span> <span class="o">=</span> <span class="n">time_state</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
<span class="k">if</span> <span class="n">old_ts</span> <span class="o">&lt;</span> <span class="n">new_ts</span><span class="p">:</span>
<span class="n">time_state</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">new_ts</span><span class="p">)</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;Generated entry for file </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">path</span><span class="p">)</span>
<span class="k">yield</span> <span class="n">file_metadata</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;File </span><span class="si">%s</span><span class="s1"> was already read&#39;</span><span class="p">,</span> <span class="n">path</span><span class="p">)</span>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>