blob: dc67a1747ab37764a98e46acbc6c0c72a221e9a9 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.io.filebasedsource &mdash; Apache Beam documentation</title>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="index" title="Index"
href="../../../genindex.html"/>
<link rel="search" title="Search" href="../../../search.html"/>
<link rel="top" title="Apache Beam documentation" href="../../../index.html"/>
<link rel="up" title="Module code" href="../../index.html"/>
<script src="../../../_static/js/modernizr.min.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search">
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.internal.html">apache_beam.internal package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.tools.html">apache_beam.tools package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.version.html">apache_beam.version module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.io.filebasedsource</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.filebasedsource</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;A framework for developing sources for new file types.</span>
<span class="sd">To create a source for a new file type a sub-class of :class:`FileBasedSource`</span>
<span class="sd">should be created. Sub-classes of :class:`FileBasedSource` must implement the</span>
<span class="sd">method :meth:`FileBasedSource.read_records()`. Please read the documentation of</span>
<span class="sd">that method for more details.</span>
<span class="sd">For an example implementation of :class:`FileBasedSource` see</span>
<span class="sd">:class:`~apache_beam.io._AvroSource`.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
<span class="kn">from</span> <span class="nn">past.builtins</span> <span class="k">import</span> <span class="n">long</span>
<span class="kn">from</span> <span class="nn">past.builtins</span> <span class="k">import</span> <span class="n">unicode</span>
<span class="kn">from</span> <span class="nn">apache_beam.internal</span> <span class="k">import</span> <span class="n">pickler</span>
<span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="k">import</span> <span class="n">concat_source</span>
<span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="k">import</span> <span class="n">iobase</span>
<span class="kn">from</span> <span class="nn">apache_beam.io</span> <span class="k">import</span> <span class="n">range_trackers</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.filesystem</span> <span class="k">import</span> <span class="n">CompressionTypes</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.filesystems</span> <span class="k">import</span> <span class="n">FileSystems</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.restriction_trackers</span> <span class="k">import</span> <span class="n">OffsetRange</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="k">import</span> <span class="n">StaticValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="k">import</span> <span class="n">ValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="k">import</span> <span class="n">check_accessible</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.core</span> <span class="k">import</span> <span class="n">DoFn</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.core</span> <span class="k">import</span> <span class="n">ParDo</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.core</span> <span class="k">import</span> <span class="n">PTransform</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="k">import</span> <span class="n">DisplayDataItem</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.util</span> <span class="k">import</span> <span class="n">Reshuffle</span>
<span class="n">MAX_NUM_THREADS_FOR_SIZE_ESTIMATION</span> <span class="o">=</span> <span class="mi">25</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;FileBasedSource&#39;</span><span class="p">]</span>
<div class="viewcode-block" id="FileBasedSource"><a class="viewcode-back" href="../../../apache_beam.io.filebasedsource.html#apache_beam.io.filebasedsource.FileBasedSource">[docs]</a><span class="k">class</span> <span class="nc">FileBasedSource</span><span class="p">(</span><span class="n">iobase</span><span class="o">.</span><span class="n">BoundedSource</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A :class:`~apache_beam.io.iobase.BoundedSource` for reading a file glob of</span>
<span class="sd"> a given type.&quot;&quot;&quot;</span>
<span class="n">MIN_NUMBER_OF_FILES_TO_STAT</span> <span class="o">=</span> <span class="mi">100</span>
<span class="n">MIN_FRACTION_OF_FILES_TO_STAT</span> <span class="o">=</span> <span class="mf">0.01</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">file_pattern</span><span class="p">,</span>
<span class="n">min_bundle_size</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">compression_type</span><span class="o">=</span><span class="n">CompressionTypes</span><span class="o">.</span><span class="n">AUTO</span><span class="p">,</span>
<span class="n">splittable</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">validate</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes :class:`FileBasedSource`.</span>
<span class="sd"> Args:</span>
<span class="sd"> file_pattern (str): the file glob to read a string or a</span>
<span class="sd"> :class:`~apache_beam.options.value_provider.ValueProvider`</span>
<span class="sd"> (placeholder to inject a runtime value).</span>
<span class="sd"> min_bundle_size (str): minimum size of bundles that should be generated</span>
<span class="sd"> when performing initial splitting on this source.</span>
<span class="sd"> compression_type (str): Used to handle compressed output files.</span>
<span class="sd"> Typical value is :attr:`CompressionTypes.AUTO</span>
<span class="sd"> &lt;apache_beam.io.filesystem.CompressionTypes.AUTO&gt;`,</span>
<span class="sd"> in which case the final file path&#39;s extension will be used to detect</span>
<span class="sd"> the compression.</span>
<span class="sd"> splittable (bool): whether :class:`FileBasedSource` should try to</span>
<span class="sd"> logically split a single file into data ranges so that different parts</span>
<span class="sd"> of the same file can be read in parallel. If set to :data:`False`,</span>
<span class="sd"> :class:`FileBasedSource` will prevent both initial and dynamic splitting</span>
<span class="sd"> of sources for single files. File patterns that represent multiple files</span>
<span class="sd"> may still get split into sources for individual files. Even if set to</span>
<span class="sd"> :data:`True` by the user, :class:`FileBasedSource` may choose to not</span>
<span class="sd"> split the file, for example, for compressed files where currently it is</span>
<span class="sd"> not possible to efficiently read a data range without decompressing the</span>
<span class="sd"> whole file.</span>
<span class="sd"> validate (bool): Boolean flag to verify that the files exist during the</span>
<span class="sd"> pipeline creation time.</span>
<span class="sd"> Raises:</span>
<span class="sd"> ~exceptions.TypeError: when **compression_type** is not valid or if</span>
<span class="sd"> **file_pattern** is not a :class:`str` or a</span>
<span class="sd"> :class:`~apache_beam.options.value_provider.ValueProvider`.</span>
<span class="sd"> ~exceptions.ValueError: when compression and splittable files are</span>
<span class="sd"> specified.</span>
<span class="sd"> ~exceptions.IOError: when the file pattern specified yields an empty</span>
<span class="sd"> result.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">file_pattern</span><span class="p">,</span> <span class="p">((</span><span class="nb">str</span><span class="p">,</span> <span class="n">unicode</span><span class="p">),</span> <span class="n">ValueProvider</span><span class="p">)):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1">: file_pattern must be of type string&#39;</span>
<span class="s1">&#39; or ValueProvider; got </span><span class="si">%r</span><span class="s1"> instead&#39;</span>
<span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">file_pattern</span><span class="p">))</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">file_pattern</span><span class="p">,</span> <span class="p">(</span><span class="nb">str</span><span class="p">,</span> <span class="n">unicode</span><span class="p">)):</span>
<span class="n">file_pattern</span> <span class="o">=</span> <span class="n">StaticValueProvider</span><span class="p">(</span><span class="nb">str</span><span class="p">,</span> <span class="n">file_pattern</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pattern</span> <span class="o">=</span> <span class="n">file_pattern</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_concat_source</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span> <span class="o">=</span> <span class="n">min_bundle_size</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">is_valid_compression_type</span><span class="p">(</span><span class="n">compression_type</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">&#39;compression_type must be CompressionType object but &#39;</span>
<span class="s1">&#39;was </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">compression_type</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">=</span> <span class="n">compression_type</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span> <span class="o">=</span> <span class="n">splittable</span>
<span class="k">if</span> <span class="n">validate</span> <span class="ow">and</span> <span class="n">file_pattern</span><span class="o">.</span><span class="n">is_accessible</span><span class="p">():</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">()</span>
<div class="viewcode-block" id="FileBasedSource.display_data"><a class="viewcode-back" href="../../../apache_beam.io.filebasedsource.html#apache_beam.io.filebasedsource.FileBasedSource.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="s1">&#39;file_pattern&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_pattern</span><span class="p">),</span>
<span class="n">label</span><span class="o">=</span><span class="s2">&quot;File Pattern&quot;</span><span class="p">),</span>
<span class="s1">&#39;compression&#39;</span><span class="p">:</span> <span class="n">DisplayDataItem</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span><span class="p">),</span>
<span class="n">label</span><span class="o">=</span><span class="s1">&#39;Compression Type&#39;</span><span class="p">)}</span></div>
<span class="nd">@check_accessible</span><span class="p">([</span><span class="s1">&#39;_pattern&#39;</span><span class="p">])</span>
<span class="k">def</span> <span class="nf">_get_concat_source</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_concat_source</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">pattern</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pattern</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="n">single_file_sources</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">match_result</span> <span class="o">=</span> <span class="n">FileSystems</span><span class="o">.</span><span class="n">match</span><span class="p">([</span><span class="n">pattern</span><span class="p">])[</span><span class="mi">0</span><span class="p">]</span>
<span class="n">files_metadata</span> <span class="o">=</span> <span class="n">match_result</span><span class="o">.</span><span class="n">metadata_list</span>
<span class="c1"># We create a reference for FileBasedSource that will be serialized along</span>
<span class="c1"># with each _SingleFileSource. To prevent this FileBasedSource from having</span>
<span class="c1"># a reference to ConcatSource (resulting in quadratic space complexity)</span>
<span class="c1"># we clone it here.</span>
<span class="n">file_based_source_ref</span> <span class="o">=</span> <span class="n">pickler</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">pickler</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="k">for</span> <span class="n">file_metadata</span> <span class="ow">in</span> <span class="n">files_metadata</span><span class="p">:</span>
<span class="n">file_name</span> <span class="o">=</span> <span class="n">file_metadata</span><span class="o">.</span><span class="n">path</span>
<span class="n">file_size</span> <span class="o">=</span> <span class="n">file_metadata</span><span class="o">.</span><span class="n">size_in_bytes</span>
<span class="k">if</span> <span class="n">file_size</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">continue</span> <span class="c1"># Ignoring empty file.</span>
<span class="c1"># We determine splittability of this specific file.</span>
<span class="n">splittable</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">splittable</span> <span class="ow">and</span>
<span class="n">_determine_splittability_from_compression_type</span><span class="p">(</span>
<span class="n">file_name</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span><span class="p">))</span>
<span class="n">single_file_source</span> <span class="o">=</span> <span class="n">_SingleFileSource</span><span class="p">(</span>
<span class="n">file_based_source_ref</span><span class="p">,</span> <span class="n">file_name</span><span class="p">,</span>
<span class="mi">0</span><span class="p">,</span>
<span class="n">file_size</span><span class="p">,</span>
<span class="n">min_bundle_size</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span><span class="p">,</span>
<span class="n">splittable</span><span class="o">=</span><span class="n">splittable</span><span class="p">)</span>
<span class="n">single_file_sources</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">single_file_source</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_concat_source</span> <span class="o">=</span> <span class="n">concat_source</span><span class="o">.</span><span class="n">ConcatSource</span><span class="p">(</span><span class="n">single_file_sources</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_concat_source</span>
<div class="viewcode-block" id="FileBasedSource.open_file"><a class="viewcode-back" href="../../../apache_beam.io.filebasedsource.html#apache_beam.io.filebasedsource.FileBasedSource.open_file">[docs]</a> <span class="k">def</span> <span class="nf">open_file</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_name</span><span class="p">):</span>
<span class="k">return</span> <span class="n">FileSystems</span><span class="o">.</span><span class="n">open</span><span class="p">(</span>
<span class="n">file_name</span><span class="p">,</span> <span class="s1">&#39;application/octet-stream&#39;</span><span class="p">,</span>
<span class="n">compression_type</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span><span class="p">)</span></div>
<span class="nd">@check_accessible</span><span class="p">([</span><span class="s1">&#39;_pattern&#39;</span><span class="p">])</span>
<span class="k">def</span> <span class="nf">_validate</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Validate if there are actual files in the specified glob pattern</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">pattern</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pattern</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="c1"># Limit the responses as we only want to check if something exists</span>
<span class="n">match_result</span> <span class="o">=</span> <span class="n">FileSystems</span><span class="o">.</span><span class="n">match</span><span class="p">([</span><span class="n">pattern</span><span class="p">],</span> <span class="n">limits</span><span class="o">=</span><span class="p">[</span><span class="mi">1</span><span class="p">])[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">match_result</span><span class="o">.</span><span class="n">metadata_list</span><span class="p">)</span> <span class="o">&lt;=</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">IOError</span><span class="p">(</span>
<span class="s1">&#39;No files found based on the file pattern </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">pattern</span><span class="p">)</span>
<div class="viewcode-block" id="FileBasedSource.split"><a class="viewcode-back" href="../../../apache_beam.io.filebasedsource.html#apache_beam.io.filebasedsource.FileBasedSource.split">[docs]</a> <span class="k">def</span> <span class="nf">split</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">desired_bundle_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">start_position</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">stop_position</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_concat_source</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(</span>
<span class="n">desired_bundle_size</span><span class="o">=</span><span class="n">desired_bundle_size</span><span class="p">,</span>
<span class="n">start_position</span><span class="o">=</span><span class="n">start_position</span><span class="p">,</span>
<span class="n">stop_position</span><span class="o">=</span><span class="n">stop_position</span><span class="p">)</span></div>
<div class="viewcode-block" id="FileBasedSource.estimate_size"><a class="viewcode-back" href="../../../apache_beam.io.filebasedsource.html#apache_beam.io.filebasedsource.FileBasedSource.estimate_size">[docs]</a> <span class="nd">@check_accessible</span><span class="p">([</span><span class="s1">&#39;_pattern&#39;</span><span class="p">])</span>
<span class="k">def</span> <span class="nf">estimate_size</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">pattern</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pattern</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="n">match_result</span> <span class="o">=</span> <span class="n">FileSystems</span><span class="o">.</span><span class="n">match</span><span class="p">([</span><span class="n">pattern</span><span class="p">])[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">return</span> <span class="nb">sum</span><span class="p">([</span><span class="n">f</span><span class="o">.</span><span class="n">size_in_bytes</span> <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">match_result</span><span class="o">.</span><span class="n">metadata_list</span><span class="p">])</span></div>
<div class="viewcode-block" id="FileBasedSource.read"><a class="viewcode-back" href="../../../apache_beam.io.filebasedsource.html#apache_beam.io.filebasedsource.FileBasedSource.read">[docs]</a> <span class="k">def</span> <span class="nf">read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">range_tracker</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_concat_source</span><span class="p">()</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="n">range_tracker</span><span class="p">)</span></div>
<div class="viewcode-block" id="FileBasedSource.get_range_tracker"><a class="viewcode-back" href="../../../apache_beam.io.filebasedsource.html#apache_beam.io.filebasedsource.FileBasedSource.get_range_tracker">[docs]</a> <span class="k">def</span> <span class="nf">get_range_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_position</span><span class="p">,</span> <span class="n">stop_position</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_concat_source</span><span class="p">()</span><span class="o">.</span><span class="n">get_range_tracker</span><span class="p">(</span><span class="n">start_position</span><span class="p">,</span>
<span class="n">stop_position</span><span class="p">)</span></div>
<div class="viewcode-block" id="FileBasedSource.read_records"><a class="viewcode-back" href="../../../apache_beam.io.filebasedsource.html#apache_beam.io.filebasedsource.FileBasedSource.read_records">[docs]</a> <span class="k">def</span> <span class="nf">read_records</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_name</span><span class="p">,</span> <span class="n">offset_range_tracker</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a generator of records created by reading file &#39;file_name&#39;.</span>
<span class="sd"> Args:</span>
<span class="sd"> file_name: a ``string`` that gives the name of the file to be read. Method</span>
<span class="sd"> ``FileBasedSource.open_file()`` must be used to open the file</span>
<span class="sd"> and create a seekable file object.</span>
<span class="sd"> offset_range_tracker: a object of type ``OffsetRangeTracker``. This</span>
<span class="sd"> defines the byte range of the file that should be</span>
<span class="sd"> read. See documentation in</span>
<span class="sd"> ``iobase.BoundedSource.read()`` for more information</span>
<span class="sd"> on reading records while complying to the range</span>
<span class="sd"> defined by a given ``RangeTracker``.</span>
<span class="sd"> Returns:</span>
<span class="sd"> an iterator that gives the records read from the given file.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span></div>
<span class="nd">@property</span>
<span class="k">def</span> <span class="nf">splittable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span></div>
<span class="k">def</span> <span class="nf">_determine_splittability_from_compression_type</span><span class="p">(</span>
<span class="n">file_path</span><span class="p">,</span> <span class="n">compression_type</span><span class="p">):</span>
<span class="k">if</span> <span class="n">compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">AUTO</span><span class="p">:</span>
<span class="n">compression_type</span> <span class="o">=</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">detect_compression_type</span><span class="p">(</span><span class="n">file_path</span><span class="p">)</span>
<span class="k">return</span> <span class="n">compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">UNCOMPRESSED</span>
<span class="k">class</span> <span class="nc">_SingleFileSource</span><span class="p">(</span><span class="n">iobase</span><span class="o">.</span><span class="n">BoundedSource</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Denotes a source for a specific file type.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">file_based_source</span><span class="p">,</span> <span class="n">file_name</span><span class="p">,</span> <span class="n">start_offset</span><span class="p">,</span> <span class="n">stop_offset</span><span class="p">,</span>
<span class="n">min_bundle_size</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> <span class="n">splittable</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">start_offset</span><span class="p">,</span> <span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="n">long</span><span class="p">)):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;start_offset must be a number. Received: </span><span class="si">%r</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">start_offset</span><span class="p">)</span>
<span class="k">if</span> <span class="n">stop_offset</span> <span class="o">!=</span> <span class="n">range_trackers</span><span class="o">.</span><span class="n">OffsetRangeTracker</span><span class="o">.</span><span class="n">OFFSET_INFINITY</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">stop_offset</span><span class="p">,</span> <span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="n">long</span><span class="p">)):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;stop_offset must be a number. Received: </span><span class="si">%r</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">stop_offset</span><span class="p">)</span>
<span class="k">if</span> <span class="n">start_offset</span> <span class="o">&gt;=</span> <span class="n">stop_offset</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;start_offset must be smaller than stop_offset. Received </span><span class="si">%d</span><span class="s1"> and </span><span class="si">%d</span><span class="s1"> &#39;</span>
<span class="s1">&#39;for start and stop offsets respectively&#39;</span> <span class="o">%</span>
<span class="p">(</span><span class="n">start_offset</span><span class="p">,</span> <span class="n">stop_offset</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_file_name</span> <span class="o">=</span> <span class="n">file_name</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_is_gcs_file</span> <span class="o">=</span> <span class="n">file_name</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;gs://&#39;</span><span class="p">)</span> <span class="k">if</span> <span class="n">file_name</span> <span class="k">else</span> <span class="kc">False</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_start_offset</span> <span class="o">=</span> <span class="n">start_offset</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_stop_offset</span> <span class="o">=</span> <span class="n">stop_offset</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span> <span class="o">=</span> <span class="n">min_bundle_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_file_based_source</span> <span class="o">=</span> <span class="n">file_based_source</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span> <span class="o">=</span> <span class="n">splittable</span>
<span class="k">def</span> <span class="nf">split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">desired_bundle_size</span><span class="p">,</span> <span class="n">start_offset</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">stop_offset</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="n">start_offset</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">start_offset</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_start_offset</span>
<span class="k">if</span> <span class="n">stop_offset</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">stop_offset</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_stop_offset</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span><span class="p">:</span>
<span class="n">splits</span> <span class="o">=</span> <span class="n">OffsetRange</span><span class="p">(</span><span class="n">start_offset</span><span class="p">,</span> <span class="n">stop_offset</span><span class="p">)</span><span class="o">.</span><span class="n">split</span><span class="p">(</span>
<span class="n">desired_bundle_size</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span><span class="p">)</span>
<span class="k">for</span> <span class="n">split</span> <span class="ow">in</span> <span class="n">splits</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">iobase</span><span class="o">.</span><span class="n">SourceBundle</span><span class="p">(</span>
<span class="n">split</span><span class="o">.</span><span class="n">stop</span> <span class="o">-</span> <span class="n">split</span><span class="o">.</span><span class="n">start</span><span class="p">,</span>
<span class="n">_SingleFileSource</span><span class="p">(</span>
<span class="c1"># Copying this so that each sub-source gets a fresh instance.</span>
<span class="n">pickler</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">pickler</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_file_based_source</span><span class="p">)),</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_file_name</span><span class="p">,</span>
<span class="n">split</span><span class="o">.</span><span class="n">start</span><span class="p">,</span>
<span class="n">split</span><span class="o">.</span><span class="n">stop</span><span class="p">,</span>
<span class="n">min_bundle_size</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span><span class="p">,</span>
<span class="n">splittable</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span><span class="p">),</span>
<span class="n">split</span><span class="o">.</span><span class="n">start</span><span class="p">,</span>
<span class="n">split</span><span class="o">.</span><span class="n">stop</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Returning a single sub-source with end offset set to OFFSET_INFINITY (so</span>
<span class="c1"># that all data of the source gets read) since this source is</span>
<span class="c1"># unsplittable. Choosing size of the file as end offset will be wrong for</span>
<span class="c1"># certain unsplittable source, e.g., compressed sources.</span>
<span class="k">yield</span> <span class="n">iobase</span><span class="o">.</span><span class="n">SourceBundle</span><span class="p">(</span>
<span class="n">stop_offset</span> <span class="o">-</span> <span class="n">start_offset</span><span class="p">,</span>
<span class="n">_SingleFileSource</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_file_based_source</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_file_name</span><span class="p">,</span>
<span class="n">start_offset</span><span class="p">,</span>
<span class="n">range_trackers</span><span class="o">.</span><span class="n">OffsetRangeTracker</span><span class="o">.</span><span class="n">OFFSET_INFINITY</span><span class="p">,</span>
<span class="n">min_bundle_size</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span><span class="p">,</span>
<span class="n">splittable</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span>
<span class="p">),</span>
<span class="n">start_offset</span><span class="p">,</span>
<span class="n">range_trackers</span><span class="o">.</span><span class="n">OffsetRangeTracker</span><span class="o">.</span><span class="n">OFFSET_INFINITY</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">estimate_size</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_stop_offset</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_start_offset</span>
<span class="k">def</span> <span class="nf">get_range_tracker</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_position</span><span class="p">,</span> <span class="n">stop_position</span><span class="p">):</span>
<span class="k">if</span> <span class="n">start_position</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">start_position</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_start_offset</span>
<span class="k">if</span> <span class="n">stop_position</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># If file is unsplittable we choose OFFSET_INFINITY as the default end</span>
<span class="c1"># offset so that all data of the source gets read. Choosing size of the</span>
<span class="c1"># file as end offset will be wrong for certain unsplittable source, for</span>
<span class="c1"># e.g., compressed sources.</span>
<span class="n">stop_position</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_stop_offset</span> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span>
<span class="k">else</span> <span class="n">range_trackers</span><span class="o">.</span><span class="n">OffsetRangeTracker</span><span class="o">.</span><span class="n">OFFSET_INFINITY</span><span class="p">)</span>
<span class="n">range_tracker</span> <span class="o">=</span> <span class="n">range_trackers</span><span class="o">.</span><span class="n">OffsetRangeTracker</span><span class="p">(</span>
<span class="n">start_position</span><span class="p">,</span> <span class="n">stop_position</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span><span class="p">:</span>
<span class="n">range_tracker</span> <span class="o">=</span> <span class="n">range_trackers</span><span class="o">.</span><span class="n">UnsplittableRangeTracker</span><span class="p">(</span><span class="n">range_tracker</span><span class="p">)</span>
<span class="k">return</span> <span class="n">range_tracker</span>
<span class="k">def</span> <span class="nf">read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">range_tracker</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_file_based_source</span><span class="o">.</span><span class="n">read_records</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_file_name</span><span class="p">,</span> <span class="n">range_tracker</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">default_output_coder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_file_based_source</span><span class="o">.</span><span class="n">default_output_coder</span><span class="p">()</span>
<span class="k">class</span> <span class="nc">_ExpandIntoRanges</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">splittable</span><span class="p">,</span> <span class="n">compression_type</span><span class="p">,</span> <span class="n">desired_bundle_size</span><span class="p">,</span> <span class="n">min_bundle_size</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_desired_bundle_size</span> <span class="o">=</span> <span class="n">desired_bundle_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span> <span class="o">=</span> <span class="n">min_bundle_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span> <span class="o">=</span> <span class="n">splittable</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">=</span> <span class="n">compression_type</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">match_results</span> <span class="o">=</span> <span class="n">FileSystems</span><span class="o">.</span><span class="n">match</span><span class="p">([</span><span class="n">element</span><span class="p">])</span>
<span class="k">for</span> <span class="n">metadata</span> <span class="ow">in</span> <span class="n">match_results</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">metadata_list</span><span class="p">:</span>
<span class="n">splittable</span> <span class="o">=</span> <span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span> <span class="ow">and</span>
<span class="n">_determine_splittability_from_compression_type</span><span class="p">(</span>
<span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span><span class="p">))</span>
<span class="k">if</span> <span class="n">splittable</span><span class="p">:</span>
<span class="k">for</span> <span class="n">split</span> <span class="ow">in</span> <span class="n">OffsetRange</span><span class="p">(</span>
<span class="mi">0</span><span class="p">,</span> <span class="n">metadata</span><span class="o">.</span><span class="n">size_in_bytes</span><span class="p">)</span><span class="o">.</span><span class="n">split</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_desired_bundle_size</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span><span class="p">):</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">metadata</span><span class="p">,</span> <span class="n">split</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">yield</span> <span class="p">(</span><span class="n">metadata</span><span class="p">,</span> <span class="n">OffsetRange</span><span class="p">(</span>
<span class="mi">0</span><span class="p">,</span> <span class="n">range_trackers</span><span class="o">.</span><span class="n">OffsetRangeTracker</span><span class="o">.</span><span class="n">OFFSET_INFINITY</span><span class="p">))</span>
<span class="k">class</span> <span class="nc">_ReadRange</span><span class="p">(</span><span class="n">DoFn</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source_from_file</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_from_file</span> <span class="o">=</span> <span class="n">source_from_file</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">metadata</span><span class="p">,</span> <span class="nb">range</span> <span class="o">=</span> <span class="n">element</span>
<span class="n">source</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_source_from_file</span><span class="p">(</span><span class="n">metadata</span><span class="o">.</span><span class="n">path</span><span class="p">)</span>
<span class="c1"># Following split() operation has to be performed to create a proper</span>
<span class="c1"># _SingleFileSource. Otherwise what we have is a ConcatSource that contains</span>
<span class="c1"># a single _SingleFileSource. ConcatSource.read() expects a RangeTraker for</span>
<span class="c1"># sub-source range and reads full sub-sources (not byte ranges).</span>
<span class="n">source</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">source</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="nb">float</span><span class="p">(</span><span class="s1">&#39;inf&#39;</span><span class="p">)))[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">source</span>
<span class="k">for</span> <span class="n">record</span> <span class="ow">in</span> <span class="n">source</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="nb">range</span><span class="o">.</span><span class="n">new_tracker</span><span class="p">()):</span>
<span class="k">yield</span> <span class="n">record</span>
<span class="k">class</span> <span class="nc">ReadAllFiles</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A Read transform that reads a PCollection of files.</span>
<span class="sd"> Pipeline authors should not use this directly. This is to be used by Read</span>
<span class="sd"> PTransform authors who wishes to implement file-based Read transforms that</span>
<span class="sd"> read a PCollection of files.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">splittable</span><span class="p">,</span> <span class="n">compression_type</span><span class="p">,</span> <span class="n">desired_bundle_size</span><span class="p">,</span> <span class="n">min_bundle_size</span><span class="p">,</span>
<span class="n">source_from_file</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Args:</span>
<span class="sd"> splittable: If False, files won&#39;t be split into sub-ranges. If True,</span>
<span class="sd"> files may or may not be split into data ranges.</span>
<span class="sd"> compression_type: A ``CompressionType`` object that specifies the</span>
<span class="sd"> compression type of the files that will be processed. If</span>
<span class="sd"> ``CompressionType.AUTO``, system will try to automatically</span>
<span class="sd"> determine the compression type based on the extension of</span>
<span class="sd"> files.</span>
<span class="sd"> desired_bundle_size: the desired size of data ranges that should be</span>
<span class="sd"> generated when splitting a file into data ranges.</span>
<span class="sd"> min_bundle_size: minimum size of data ranges that should be generated when</span>
<span class="sd"> splitting a file into data ranges.</span>
<span class="sd"> source_from_file: a function that produces a ``BoundedSource`` given a</span>
<span class="sd"> file name. System will use this function to generate</span>
<span class="sd"> ``BoundedSource`` objects for file paths. Note that file</span>
<span class="sd"> paths passed to this will be for individual files, not</span>
<span class="sd"> for file patterns even if the ``PCollection`` of files</span>
<span class="sd"> processed by the transform consist of file patterns.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span> <span class="o">=</span> <span class="n">splittable</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">=</span> <span class="n">compression_type</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_desired_bundle_size</span> <span class="o">=</span> <span class="n">desired_bundle_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span> <span class="o">=</span> <span class="n">min_bundle_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_source_from_file</span> <span class="o">=</span> <span class="n">source_from_file</span>
<span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pvalue</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span><span class="n">pvalue</span>
<span class="o">|</span> <span class="s1">&#39;ExpandIntoRanges&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span><span class="n">_ExpandIntoRanges</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_splittable</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_desired_bundle_size</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_min_bundle_size</span><span class="p">))</span>
<span class="o">|</span> <span class="s1">&#39;Reshard&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">Reshuffle</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;ReadRange&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ParDo</span><span class="p">(</span><span class="n">_ReadRange</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_source_from_file</span><span class="p">)))</span>
</pre></div>
</div>
<div class="articleComments">
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright .
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT:'../../../',
VERSION:'',
COLLAPSE_INDEX:false,
FILE_SUFFIX:'.html',
HAS_SOURCE: true,
SOURCELINK_SUFFIX: '.txt'
};
</script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.StickyNav.enable();
});
</script>
</body>
</html>