| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.io.filesystem — Apache Beam documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" /> |
| |
| |
| |
| |
| |
| <link rel="index" title="Index" |
| href="../../../genindex.html"/> |
| <link rel="search" title="Search" href="../../../search.html"/> |
| <link rel="top" title="Apache Beam documentation" href="../../../index.html"/> |
| <link rel="up" title="Module code" href="../../index.html"/> |
| |
| |
| <script src="../../../_static/js/modernizr.min.js"></script> |
| |
| </head> |
| |
| <body class="wy-body-for-nav" role="document"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search"> |
| |
| |
| |
| <a href="../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.internal.html">apache_beam.internal package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.tools.html">apache_beam.tools package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.version.html">apache_beam.version module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" role="navigation" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| |
| <div class="wy-nav-content"> |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.io.filesystem</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.io.filesystem</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| <span class="sd">"""File system abstraction for file-based sources and sinks.</span> |
| |
| <span class="sd">Note to implementors:</span> |
| <span class="sd"> "path" arguments will be URLs in the form scheme://foo/bar. The exception is</span> |
| <span class="sd"> LocalFileSystem, which gets unix-style paths in the form /foo/bar.</span> |
| <span class="sd">"""</span> |
| |
| <span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span> |
| <span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">division</span> |
| |
| <span class="kn">import</span> <span class="nn">abc</span> |
| <span class="kn">import</span> <span class="nn">bz2</span> |
| <span class="kn">import</span> <span class="nn">fnmatch</span> |
| <span class="kn">import</span> <span class="nn">io</span> |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">import</span> <span class="nn">os</span> |
| <span class="kn">import</span> <span class="nn">posixpath</span> |
| <span class="kn">import</span> <span class="nn">re</span> |
| <span class="kn">import</span> <span class="nn">time</span> |
| <span class="kn">import</span> <span class="nn">zlib</span> |
| <span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">object</span> |
| <span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">zip</span> |
| |
| <span class="kn">from</span> <span class="nn">future</span> <span class="k">import</span> <span class="n">standard_library</span> |
| <span class="kn">from</span> <span class="nn">future.utils</span> <span class="k">import</span> <span class="n">with_metaclass</span> |
| <span class="kn">from</span> <span class="nn">past.builtins</span> <span class="k">import</span> <span class="n">long</span> |
| <span class="kn">from</span> <span class="nn">past.builtins</span> <span class="k">import</span> <span class="n">unicode</span> |
| |
| <span class="kn">from</span> <span class="nn">apache_beam.utils.plugin</span> <span class="k">import</span> <span class="n">BeamPlugin</span> |
| |
| <span class="n">standard_library</span><span class="o">.</span><span class="n">install_aliases</span><span class="p">()</span> |
| |
| <span class="n">logger</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="n">DEFAULT_READ_BUFFER_SIZE</span> <span class="o">=</span> <span class="mi">16</span> <span class="o">*</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024</span> |
| |
| <span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'CompressionTypes'</span><span class="p">,</span> <span class="s1">'CompressedFile'</span><span class="p">,</span> <span class="s1">'FileMetadata'</span><span class="p">,</span> <span class="s1">'FileSystem'</span><span class="p">,</span> |
| <span class="s1">'MatchResult'</span><span class="p">]</span> |
| |
| |
| <div class="viewcode-block" id="CompressionTypes"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressionTypes">[docs]</a><span class="k">class</span> <span class="nc">CompressionTypes</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="sd">"""Enum-like class representing known compression types."""</span> |
| |
| <span class="c1"># Detect compression based on filename extension.</span> |
| <span class="c1">#</span> |
| <span class="c1"># The following extensions are currently recognized by auto-detection:</span> |
| <span class="c1"># .bz2 (implies BZIP2 as described below).</span> |
| <span class="c1"># .gz (implies GZIP as described below)</span> |
| <span class="c1"># Any non-recognized extension implies UNCOMPRESSED as described below.</span> |
| <span class="n">AUTO</span> <span class="o">=</span> <span class="s1">'auto'</span> |
| |
| <span class="c1"># BZIP2 compression.</span> |
| <span class="n">BZIP2</span> <span class="o">=</span> <span class="s1">'bzip2'</span> |
| |
| <span class="c1"># GZIP compression (deflate with GZIP headers).</span> |
| <span class="n">GZIP</span> <span class="o">=</span> <span class="s1">'gzip'</span> |
| |
| <span class="c1"># Uncompressed (i.e., may be split).</span> |
| <span class="n">UNCOMPRESSED</span> <span class="o">=</span> <span class="s1">'uncompressed'</span> |
| |
| <div class="viewcode-block" id="CompressionTypes.is_valid_compression_type"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressionTypes.is_valid_compression_type">[docs]</a> <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">is_valid_compression_type</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">compression_type</span><span class="p">):</span> |
| <span class="sd">"""Returns True for valid compression types, False otherwise."""</span> |
| <span class="n">types</span> <span class="o">=</span> <span class="nb">set</span><span class="p">([</span> |
| <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">AUTO</span><span class="p">,</span> |
| <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">BZIP2</span><span class="p">,</span> |
| <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">GZIP</span><span class="p">,</span> |
| <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">UNCOMPRESSED</span> |
| <span class="p">])</span> |
| <span class="k">return</span> <span class="n">compression_type</span> <span class="ow">in</span> <span class="n">types</span></div> |
| |
| <div class="viewcode-block" id="CompressionTypes.mime_type"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressionTypes.mime_type">[docs]</a> <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">mime_type</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">compression_type</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s1">'application/octet-stream'</span><span class="p">):</span> |
| <span class="n">mime_types_by_compression_type</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="bp">cls</span><span class="o">.</span><span class="n">BZIP2</span><span class="p">:</span> <span class="s1">'application/x-bz2'</span><span class="p">,</span> |
| <span class="bp">cls</span><span class="o">.</span><span class="n">GZIP</span><span class="p">:</span> <span class="s1">'application/x-gzip'</span><span class="p">,</span> |
| <span class="p">}</span> |
| <span class="k">return</span> <span class="n">mime_types_by_compression_type</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">compression_type</span><span class="p">,</span> <span class="n">default</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="CompressionTypes.detect_compression_type"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressionTypes.detect_compression_type">[docs]</a> <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">detect_compression_type</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">file_path</span><span class="p">):</span> |
| <span class="sd">"""Returns the compression type of a file (based on its suffix)."""</span> |
| <span class="n">compression_types_by_suffix</span> <span class="o">=</span> <span class="p">{</span><span class="s1">'.bz2'</span><span class="p">:</span> <span class="bp">cls</span><span class="o">.</span><span class="n">BZIP2</span><span class="p">,</span> <span class="s1">'.gz'</span><span class="p">:</span> <span class="bp">cls</span><span class="o">.</span><span class="n">GZIP</span><span class="p">}</span> |
| <span class="n">lowercased_path</span> <span class="o">=</span> <span class="n">file_path</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> |
| <span class="k">for</span> <span class="n">suffix</span><span class="p">,</span> <span class="n">compression_type</span> <span class="ow">in</span> <span class="n">compression_types_by_suffix</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="k">if</span> <span class="n">lowercased_path</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="n">suffix</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">compression_type</span> |
| <span class="k">return</span> <span class="bp">cls</span><span class="o">.</span><span class="n">UNCOMPRESSED</span></div></div> |
| |
| |
| <div class="viewcode-block" id="CompressedFile"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile">[docs]</a><span class="k">class</span> <span class="nc">CompressedFile</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="sd">"""File wrapper for easier handling of compressed files."""</span> |
| <span class="c1"># XXX: This class is not thread safe in the read path.</span> |
| |
| <span class="c1"># The bit mask to use for the wbits parameters of the zlib compressor and</span> |
| <span class="c1"># decompressor objects.</span> |
| <span class="n">_gzip_mask</span> <span class="o">=</span> <span class="n">zlib</span><span class="o">.</span><span class="n">MAX_WBITS</span> <span class="o">|</span> <span class="mi">16</span> <span class="c1"># Mask when using GZIP headers.</span> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> |
| <span class="n">fileobj</span><span class="p">,</span> |
| <span class="n">compression_type</span><span class="o">=</span><span class="n">CompressionTypes</span><span class="o">.</span><span class="n">GZIP</span><span class="p">,</span> |
| <span class="n">read_size</span><span class="o">=</span><span class="n">DEFAULT_READ_BUFFER_SIZE</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">fileobj</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'File object must not be None'</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">is_valid_compression_type</span><span class="p">(</span><span class="n">compression_type</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">'compression_type must be CompressionType object but '</span> |
| <span class="s1">'was </span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">compression_type</span><span class="p">))</span> |
| <span class="k">if</span> <span class="n">compression_type</span> <span class="ow">in</span> <span class="p">(</span><span class="n">CompressionTypes</span><span class="o">.</span><span class="n">AUTO</span><span class="p">,</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">UNCOMPRESSED</span> |
| <span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'Cannot create object with unspecified or no compression'</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_file</span> <span class="o">=</span> <span class="n">fileobj</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">=</span> <span class="n">compression_type</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> <span class="o">!=</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'File object must be at position 0 but was </span><span class="si">%d</span><span class="s1">'</span> <span class="o">%</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">tell</span><span class="p">())</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_position</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_size</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">readable</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_size</span> <span class="o">=</span> <span class="n">read_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span> <span class="o">=</span> <span class="n">io</span><span class="o">.</span><span class="n">BytesIO</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_position</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_eof</span> <span class="o">=</span> <span class="kc">False</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_initialize_decompressor</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writeable</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_initialize_compressor</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_compressor</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">def</span> <span class="nf">_initialize_decompressor</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">BZIP2</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span> <span class="o">=</span> <span class="n">bz2</span><span class="o">.</span><span class="n">BZ2Decompressor</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">GZIP</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span> <span class="o">=</span> <span class="n">zlib</span><span class="o">.</span><span class="n">decompressobj</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_gzip_mask</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_initialize_compressor</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">BZIP2</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_compressor</span> <span class="o">=</span> <span class="n">bz2</span><span class="o">.</span><span class="n">BZ2Compressor</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">assert</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">GZIP</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_compressor</span> <span class="o">=</span> <span class="n">zlib</span><span class="o">.</span><span class="n">compressobj</span><span class="p">(</span><span class="n">zlib</span><span class="o">.</span><span class="n">Z_DEFAULT_COMPRESSION</span><span class="p">,</span> |
| <span class="n">zlib</span><span class="o">.</span><span class="n">DEFLATED</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_gzip_mask</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="CompressedFile.readable"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.readable">[docs]</a> <span class="k">def</span> <span class="nf">readable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">mode</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">mode</span> |
| <span class="k">return</span> <span class="s1">'r'</span> <span class="ow">in</span> <span class="n">mode</span> <span class="ow">or</span> <span class="s1">'a'</span> <span class="ow">in</span> <span class="n">mode</span></div> |
| |
| <div class="viewcode-block" id="CompressedFile.writeable"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.writeable">[docs]</a> <span class="k">def</span> <span class="nf">writeable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">mode</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">mode</span> |
| <span class="k">return</span> <span class="s1">'w'</span> <span class="ow">in</span> <span class="n">mode</span> <span class="ow">or</span> <span class="s1">'a'</span> <span class="ow">in</span> <span class="n">mode</span></div> |
| |
| <div class="viewcode-block" id="CompressedFile.write"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.write">[docs]</a> <span class="k">def</span> <span class="nf">write</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data</span><span class="p">):</span> |
| <span class="sd">"""Write data to file."""</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compressor</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'compressor not initialized'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_position</span> <span class="o">+=</span> <span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="p">)</span> |
| <span class="n">compressed</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compressor</span><span class="o">.</span><span class="n">compress</span><span class="p">(</span><span class="n">data</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">compressed</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">compressed</span><span class="p">)</span></div> |
| |
| <span class="k">def</span> <span class="nf">_fetch_to_internal_buffer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">num_bytes</span><span class="p">):</span> |
| <span class="sd">"""Fetch up to num_bytes into the internal buffer."""</span> |
| <span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_eof</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_position</span> <span class="o">></span> <span class="mi">0</span> <span class="ow">and</span> |
| <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_position</span><span class="p">)</span> <span class="o"><</span> <span class="n">num_bytes</span><span class="p">):</span> |
| <span class="c1"># There aren't enough number of bytes to accommodate a read, so we</span> |
| <span class="c1"># prepare for a possibly large read by clearing up all internal buffers</span> |
| <span class="c1"># but without dropping any previous held data.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">seek</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_position</span><span class="p">)</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_clear_read_buffer</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">data</span><span class="p">)</span> |
| |
| <span class="k">while</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_eof</span> <span class="ow">and</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">tell</span><span class="p">()</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_position</span> |
| <span class="p">)</span> <span class="o"><</span> <span class="n">num_bytes</span><span class="p">:</span> |
| <span class="c1"># Continue reading from the underlying file object until enough bytes are</span> |
| <span class="c1"># available, or EOF is reached.</span> |
| <span class="n">buf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_size</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">buf</span><span class="p">:</span> |
| <span class="n">decompressed</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span><span class="o">.</span><span class="n">decompress</span><span class="p">(</span><span class="n">buf</span><span class="p">)</span> |
| <span class="k">del</span> <span class="n">buf</span> <span class="c1"># Free up some possibly large and no-longer-needed memory.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">decompressed</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># EOF of current stream reached.</span> |
| <span class="c1">#</span> |
| <span class="c1"># Any uncompressed data at the end of the stream of a gzip or bzip2</span> |
| <span class="c1"># file that is not corrupted points to a concatenated compressed</span> |
| <span class="c1"># file. We read concatenated files by recursively creating decompressor</span> |
| <span class="c1"># objects for the unused compressed data.</span> |
| <span class="k">if</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">BZIP2</span> <span class="ow">or</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">GZIP</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span><span class="o">.</span><span class="n">unused_data</span> <span class="o">!=</span> <span class="s1">''</span><span class="p">:</span> |
| <span class="n">buf</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span><span class="o">.</span><span class="n">unused_data</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">bz2</span><span class="o">.</span><span class="n">BZ2Decompressor</span><span class="p">()</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">BZIP2</span> |
| <span class="k">else</span> <span class="n">zlib</span><span class="o">.</span><span class="n">decompressobj</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_gzip_mask</span><span class="p">))</span> |
| <span class="n">decompressed</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span><span class="o">.</span><span class="n">decompress</span><span class="p">(</span><span class="n">buf</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">decompressed</span><span class="p">)</span> |
| <span class="k">continue</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># Gzip and bzip2 formats do not require flushing remaining data in the</span> |
| <span class="c1"># decompressor into the read buffer when fully decompressing files.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span><span class="o">.</span><span class="n">flush</span><span class="p">())</span> |
| |
| <span class="c1"># Record that we have hit the end of file, so we won't unnecessarily</span> |
| <span class="c1"># repeat the completeness verification step above.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_eof</span> <span class="o">=</span> <span class="kc">True</span> |
| |
| <span class="k">def</span> <span class="nf">_read_from_internal_buffer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">read_fn</span><span class="p">):</span> |
| <span class="sd">"""Read from the internal buffer by using the supplied read_fn."""</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">seek</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_position</span><span class="p">)</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">read_fn</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_position</span> <span class="o">+=</span> <span class="nb">len</span><span class="p">(</span><span class="n">result</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_position</span> <span class="o">+=</span> <span class="nb">len</span><span class="p">(</span><span class="n">result</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">seek</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="n">os</span><span class="o">.</span><span class="n">SEEK_END</span><span class="p">)</span> <span class="c1"># Allow future writes.</span> |
| <span class="k">return</span> <span class="n">result</span> |
| |
| <div class="viewcode-block" id="CompressedFile.read"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.read">[docs]</a> <span class="k">def</span> <span class="nf">read</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">num_bytes</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'decompressor not initialized'</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_fetch_to_internal_buffer</span><span class="p">(</span><span class="n">num_bytes</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_from_internal_buffer</span><span class="p">(</span> |
| <span class="k">lambda</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="n">num_bytes</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="CompressedFile.readline"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.readline">[docs]</a> <span class="k">def</span> <span class="nf">readline</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""Equivalent to standard file.readline(). Same return conventions apply."""</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_decompressor</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'decompressor not initialized'</span><span class="p">)</span> |
| |
| <span class="n">bytes_io</span> <span class="o">=</span> <span class="n">io</span><span class="o">.</span><span class="n">BytesIO</span><span class="p">()</span> |
| <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="c1"># Ensure that the internal buffer has at least half the read_size. Going</span> |
| <span class="c1"># with half the _read_size (as opposed to a full _read_size) to ensure</span> |
| <span class="c1"># that actual fetches are more evenly spread out, as opposed to having 2</span> |
| <span class="c1"># consecutive reads at the beginning of a read.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_fetch_to_internal_buffer</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_size</span> <span class="o">//</span> <span class="mi">2</span><span class="p">)</span> |
| <span class="n">line</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_from_internal_buffer</span><span class="p">(</span> |
| <span class="k">lambda</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">readline</span><span class="p">())</span> |
| <span class="n">bytes_io</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">line</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">line</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="p">)</span> <span class="ow">or</span> <span class="ow">not</span> <span class="n">line</span><span class="p">:</span> |
| <span class="k">break</span> <span class="c1"># Newline or EOF reached.</span> |
| |
| <span class="k">return</span> <span class="n">bytes_io</span><span class="o">.</span><span class="n">getvalue</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="CompressedFile.closed"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.closed">[docs]</a> <span class="k">def</span> <span class="nf">closed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_file</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">closed</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="CompressedFile.close"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.close">[docs]</a> <span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">readable</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writeable</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_compressor</span><span class="o">.</span><span class="n">flush</span><span class="p">())</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">close</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="CompressedFile.flush"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.flush">[docs]</a> <span class="k">def</span> <span class="nf">flush</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">writeable</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_compressor</span><span class="o">.</span><span class="n">flush</span><span class="p">())</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span></div> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">seekable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="s1">'r'</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">mode</span> |
| |
| <span class="k">def</span> <span class="nf">_clear_read_buffer</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""Clears the read buffer by removing all the contents and</span> |
| <span class="sd"> resetting _read_position to 0"""</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_position</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">seek</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_buffer</span><span class="o">.</span><span class="n">truncate</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_rewind_file</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""Seeks to the beginning of the input file. Input file's EOF marker</span> |
| <span class="sd"> is cleared and _uncompressed_position is reset to zero"""</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_file</span><span class="o">.</span><span class="n">seek</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="n">os</span><span class="o">.</span><span class="n">SEEK_SET</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_read_eof</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_position</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="k">def</span> <span class="nf">_rewind</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""Seeks to the beginning of the input file and resets the internal read</span> |
| <span class="sd"> buffer. The decompressor object is re-initialized to ensure that no data</span> |
| <span class="sd"> left in it's buffer."""</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_clear_read_buffer</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_rewind_file</span><span class="p">()</span> |
| |
| <span class="c1"># Re-initialize decompressor to clear any data buffered prior to rewind</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_initialize_decompressor</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="CompressedFile.seek"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.seek">[docs]</a> <span class="k">def</span> <span class="nf">seek</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">offset</span><span class="p">,</span> <span class="n">whence</span><span class="o">=</span><span class="n">os</span><span class="o">.</span><span class="n">SEEK_SET</span><span class="p">):</span> |
| <span class="sd">"""Set the file's current offset.</span> |
| |
| <span class="sd"> Seeking behavior:</span> |
| |
| <span class="sd"> * seeking from the end :data:`os.SEEK_END` the whole file is decompressed</span> |
| <span class="sd"> once to determine its size. Therefore it is preferred to use</span> |
| <span class="sd"> :data:`os.SEEK_SET` or :data:`os.SEEK_CUR` to avoid the processing</span> |
| <span class="sd"> overhead</span> |
| <span class="sd"> * seeking backwards from the current position rewinds the file to ``0``</span> |
| <span class="sd"> and decompresses the chunks to the requested offset</span> |
| <span class="sd"> * seeking is only supported in files opened for reading</span> |
| <span class="sd"> * if the new offset is out of bound, it is adjusted to either ``0`` or</span> |
| <span class="sd"> ``EOF``.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> offset (int): seek offset in the uncompressed content represented as</span> |
| <span class="sd"> number</span> |
| <span class="sd"> whence (int): seek mode. Supported modes are :data:`os.SEEK_SET`</span> |
| <span class="sd"> (absolute seek), :data:`os.SEEK_CUR` (seek relative to the current</span> |
| <span class="sd"> position), and :data:`os.SEEK_END` (seek relative to the end, offset</span> |
| <span class="sd"> should be negative).</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ~exceptions.IOError: When this buffer is closed.</span> |
| <span class="sd"> ~exceptions.ValueError: When whence is invalid or the file is not seekable</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">whence</span> <span class="o">==</span> <span class="n">os</span><span class="o">.</span><span class="n">SEEK_SET</span><span class="p">:</span> |
| <span class="n">absolute_offset</span> <span class="o">=</span> <span class="n">offset</span> |
| <span class="k">elif</span> <span class="n">whence</span> <span class="o">==</span> <span class="n">os</span><span class="o">.</span><span class="n">SEEK_CUR</span><span class="p">:</span> |
| <span class="n">absolute_offset</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_position</span> <span class="o">+</span> <span class="n">offset</span> |
| <span class="k">elif</span> <span class="n">whence</span> <span class="o">==</span> <span class="n">os</span><span class="o">.</span><span class="n">SEEK_END</span><span class="p">:</span> |
| <span class="c1"># Determine and cache the uncompressed size of the file</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_size</span><span class="p">:</span> |
| <span class="n">logger</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span><span class="s2">"Seeking relative from end of file is requested. "</span> |
| <span class="s2">"Need to decompress the whole file once to determine "</span> |
| <span class="s2">"its size. This might take a while..."</span><span class="p">)</span> |
| <span class="n">uncompress_start_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> |
| <span class="k">while</span> <span class="bp">self</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_size</span><span class="p">):</span> |
| <span class="k">pass</span> |
| <span class="n">uncompress_end_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> |
| <span class="n">logger</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span><span class="s2">"Full file decompression for seek from end took </span><span class="si">%.2f</span><span class="s2"> secs"</span><span class="p">,</span> |
| <span class="p">(</span><span class="n">uncompress_end_time</span> <span class="o">-</span> <span class="n">uncompress_start_time</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_size</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_position</span> |
| <span class="n">absolute_offset</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_size</span> <span class="o">+</span> <span class="n">offset</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Whence mode </span><span class="si">%r</span><span class="s2"> is invalid."</span> <span class="o">%</span> <span class="n">whence</span><span class="p">)</span> |
| |
| <span class="c1"># Determine how many bytes needs to be read before we reach</span> |
| <span class="c1"># the requested offset. Rewind if we already passed the position.</span> |
| <span class="k">if</span> <span class="n">absolute_offset</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_position</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_rewind</span><span class="p">()</span> |
| <span class="n">bytes_to_skip</span> <span class="o">=</span> <span class="n">absolute_offset</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_position</span> |
| |
| <span class="c1"># Read until the desired position is reached or EOF occurs.</span> |
| <span class="k">while</span> <span class="n">bytes_to_skip</span><span class="p">:</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">read</span><span class="p">(</span><span class="nb">min</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_read_size</span><span class="p">,</span> <span class="n">bytes_to_skip</span><span class="p">))</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">data</span><span class="p">:</span> |
| <span class="k">break</span> |
| <span class="n">bytes_to_skip</span> <span class="o">-=</span> <span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="CompressedFile.tell"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.CompressedFile.tell">[docs]</a> <span class="k">def</span> <span class="nf">tell</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""Returns current position in uncompressed file."""</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_uncompressed_position</span></div> |
| |
| <span class="k">def</span> <span class="nf">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span> |
| |
| <span class="k">def</span> <span class="nf">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exception_type</span><span class="p">,</span> <span class="n">exception_value</span><span class="p">,</span> <span class="n">traceback</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">close</span><span class="p">()</span></div> |
| |
| |
| <div class="viewcode-block" id="FileMetadata"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileMetadata">[docs]</a><span class="k">class</span> <span class="nc">FileMetadata</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="sd">"""Metadata about a file path that is the output of FileSystem.match</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">size_in_bytes</span><span class="p">):</span> |
| <span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="p">(</span><span class="nb">str</span><span class="p">,</span> <span class="n">unicode</span><span class="p">))</span> <span class="ow">and</span> <span class="n">path</span><span class="p">,</span> <span class="s2">"Path should be a string"</span> |
| <span class="k">assert</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">size_in_bytes</span><span class="p">,</span> <span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="n">long</span><span class="p">))</span> <span class="ow">and</span> <span class="n">size_in_bytes</span> <span class="o">>=</span> <span class="mi">0</span><span class="p">,</span> \ |
| <span class="s2">"Invalid value for size_in_bytes should </span><span class="si">%s</span><span class="s2"> (of type </span><span class="si">%s</span><span class="s2">)"</span> <span class="o">%</span> <span class="p">(</span> |
| <span class="n">size_in_bytes</span><span class="p">,</span> <span class="nb">type</span><span class="p">(</span><span class="n">size_in_bytes</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="n">path</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">size_in_bytes</span> <span class="o">=</span> <span class="n">size_in_bytes</span> |
| |
| <span class="k">def</span> <span class="nf">__eq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span> |
| <span class="sd">"""Note: This is only used in tests where we verify that mock objects match.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">other</span><span class="p">,</span> <span class="n">FileMetadata</span><span class="p">)</span> <span class="ow">and</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">path</span> <span class="ow">and</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">size_in_bytes</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">size_in_bytes</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">__hash__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="nb">hash</span><span class="p">((</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">size_in_bytes</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">__ne__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span> |
| <span class="k">return</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="fm">__eq__</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="s1">'FileMetadata(</span><span class="si">%s</span><span class="s1">, </span><span class="si">%s</span><span class="s1">)'</span> <span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">size_in_bytes</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="MatchResult"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.MatchResult">[docs]</a><span class="k">class</span> <span class="nc">MatchResult</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="sd">"""Result from the ``FileSystem`` match operation which contains the list</span> |
| <span class="sd"> of matched FileMetadata.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pattern</span><span class="p">,</span> <span class="n">metadata_list</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">metadata_list</span> <span class="o">=</span> <span class="n">metadata_list</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pattern</span> <span class="o">=</span> <span class="n">pattern</span></div> |
| |
| |
| <span class="k">class</span> <span class="nc">BeamIOError</span><span class="p">(</span><span class="ne">IOError</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">msg</span><span class="p">,</span> <span class="n">exception_details</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""Class representing the errors thrown in the batch file operations.</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> msg: Message string for the exception thrown</span> |
| <span class="sd"> exception_details: Optional map of individual input to exception for</span> |
| <span class="sd"> failed operations in batch. This parameter is optional so if specified</span> |
| <span class="sd"> the user can assume that the all errors in the filesystem operation</span> |
| <span class="sd"> have been reported. When the details are missing then the operation</span> |
| <span class="sd"> may have failed anywhere so the user should use match to determine</span> |
| <span class="sd"> the current state of the system.</span> |
| <span class="sd"> """</span> |
| <span class="n">message</span> <span class="o">=</span> <span class="s2">"</span><span class="si">%s</span><span class="s2"> with exceptions </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="p">(</span><span class="n">msg</span><span class="p">,</span> <span class="n">exception_details</span><span class="p">)</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">BeamIOError</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">message</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">exception_details</span> <span class="o">=</span> <span class="n">exception_details</span> |
| |
| |
| <div class="viewcode-block" id="FileSystem"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem">[docs]</a><span class="k">class</span> <span class="nc">FileSystem</span><span class="p">(</span><span class="n">with_metaclass</span><span class="p">(</span><span class="n">abc</span><span class="o">.</span><span class="n">ABCMeta</span><span class="p">,</span> <span class="n">BeamPlugin</span><span class="p">)):</span> |
| <span class="sd">"""A class that defines the functions that can be performed on a filesystem.</span> |
| |
| <span class="sd"> All methods are abstract and they are for file system providers to</span> |
| <span class="sd"> implement. Clients should use the FileSystems class to interact with</span> |
| <span class="sd"> the correct file system based on the provided file pattern scheme.</span> |
| <span class="sd"> """</span> |
| <span class="n">CHUNK_SIZE</span> <span class="o">=</span> <span class="mi">1</span> <span class="c1"># Chuck size in the batch operations</span> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pipeline_options</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> pipeline_options: Instance of ``PipelineOptions`` or dict of options and</span> |
| <span class="sd"> values (like ``RuntimeValueProvider.runtime_options``).</span> |
| <span class="sd"> """</span> |
| |
| <span class="nd">@staticmethod</span> |
| <span class="k">def</span> <span class="nf">_get_compression_type</span><span class="p">(</span><span class="n">path</span><span class="p">,</span> <span class="n">compression_type</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">compression_type</span> <span class="o">==</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">AUTO</span><span class="p">:</span> |
| <span class="n">compression_type</span> <span class="o">=</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">detect_compression_type</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="ow">not</span> <span class="n">CompressionTypes</span><span class="o">.</span><span class="n">is_valid_compression_type</span><span class="p">(</span><span class="n">compression_type</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">'compression_type must be CompressionType object but '</span> |
| <span class="s1">'was </span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="nb">type</span><span class="p">(</span><span class="n">compression_type</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">compression_type</span> |
| |
| <div class="viewcode-block" id="FileSystem.scheme"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.scheme">[docs]</a> <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">scheme</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span> |
| <span class="sd">"""URI scheme for the FileSystem</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.join"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.join">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">join</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">basepath</span><span class="p">,</span> <span class="o">*</span><span class="n">paths</span><span class="p">):</span> |
| <span class="sd">"""Join two or more pathname components for the filesystem</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> basepath: string path of the first component of the path</span> |
| <span class="sd"> paths: path components to be added</span> |
| |
| <span class="sd"> Returns: full path after combining all the passed components</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.split"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.split">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">split</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">):</span> |
| <span class="sd">"""Splits the given path into two parts.</span> |
| |
| <span class="sd"> Splits the path into a pair (head, tail) such that tail contains the last</span> |
| <span class="sd"> component of the path and head contains everything up to that.</span> |
| |
| <span class="sd"> For file-systems other than the local file-system, head should include the</span> |
| <span class="sd"> prefix.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> path: path as a string</span> |
| <span class="sd"> Returns:</span> |
| <span class="sd"> a pair of path components as strings.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.mkdirs"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.mkdirs">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">mkdirs</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">):</span> |
| <span class="sd">"""Recursively create directories for the provided path.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> path: string path of the directory structure that should be created</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> IOError if leaf directory already exists.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.has_dirs"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.has_dirs">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">has_dirs</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""Whether this FileSystem supports directories."""</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">_list</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dir_or_prefix</span><span class="p">):</span> |
| <span class="sd">"""List files in a location.</span> |
| |
| <span class="sd"> Listing is non-recursive (for filesystems that support directories).</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> dir_or_prefix: (string) A directory or location prefix (for filesystems</span> |
| <span class="sd"> that don't have directories).</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> Generator of ``FileMetadata`` objects.</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ``BeamIOError`` if listing fails, but not if no files were found.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span> |
| |
| <span class="k">def</span> <span class="nf">_url_dirname</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url_or_path</span><span class="p">):</span> |
| <span class="sd">"""Like posixpath.dirname, but preserves scheme:// prefix.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> url_or_path: A string in the form of scheme://some/path OR /some/path.</span> |
| <span class="sd"> """</span> |
| <span class="n">match</span> <span class="o">=</span> <span class="n">re</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="sa">r</span><span class="s1">'([a-z]+://)(.*)'</span><span class="p">,</span> <span class="n">url_or_path</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">match</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">posixpath</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="n">url_or_path</span><span class="p">)</span> |
| <span class="n">url_prefix</span><span class="p">,</span> <span class="n">path</span> <span class="o">=</span> <span class="n">match</span><span class="o">.</span><span class="n">groups</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">url_prefix</span> <span class="o">+</span> <span class="n">posixpath</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="FileSystem.match"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.match">[docs]</a> <span class="k">def</span> <span class="nf">match</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">patterns</span><span class="p">,</span> <span class="n">limits</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""Find all matching paths to the patterns provided.</span> |
| |
| <span class="sd"> Pattern matching is done using fnmatch.fnmatch.</span> |
| <span class="sd"> For filesystems that have directories, matching is not recursive. Patterns</span> |
| <span class="sd"> like scheme://path/*/foo will not match anything.</span> |
| <span class="sd"> Patterns ending with '/' will be appended with '*'.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> patterns: list of string for the file path pattern to match against</span> |
| <span class="sd"> limits: list of maximum number of responses that need to be fetched</span> |
| |
| <span class="sd"> Returns: list of ``MatchResult`` objects.</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ``BeamIOError`` if any of the pattern match operations fail</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">limits</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">limits</span> <span class="o">=</span> <span class="p">[</span><span class="kc">None</span><span class="p">]</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">patterns</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">err_msg</span> <span class="o">=</span> <span class="s2">"Patterns and limits should be equal in length"</span> |
| <span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="n">patterns</span><span class="p">)</span> <span class="o">==</span> <span class="nb">len</span><span class="p">(</span><span class="n">limits</span><span class="p">),</span> <span class="n">err_msg</span> |
| |
| <span class="k">def</span> <span class="nf">_match</span><span class="p">(</span><span class="n">pattern</span><span class="p">,</span> <span class="n">limit</span><span class="p">):</span> |
| <span class="sd">"""Find all matching paths to the pattern provided."""</span> |
| <span class="k">if</span> <span class="n">pattern</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s1">'/'</span><span class="p">):</span> |
| <span class="n">pattern</span> <span class="o">+=</span> <span class="s1">'*'</span> |
| <span class="c1"># Get the part of the pattern before the first globbing character.</span> |
| <span class="c1"># For example scheme://path/foo* will become scheme://path/foo for</span> |
| <span class="c1"># filesystems like GCS, or converted to scheme://path for filesystems with</span> |
| <span class="c1"># directories.</span> |
| <span class="n">prefix_or_dir</span> <span class="o">=</span> <span class="n">re</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="s1">'^[^[*?]*'</span><span class="p">,</span> <span class="n">pattern</span><span class="p">)</span><span class="o">.</span><span class="n">group</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> |
| |
| <span class="n">file_metadatas</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="n">prefix_or_dir</span> <span class="o">==</span> <span class="n">pattern</span><span class="p">:</span> |
| <span class="c1"># Short-circuit calling self.list() if there's no glob pattern to match.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">exists</span><span class="p">(</span><span class="n">pattern</span><span class="p">):</span> |
| <span class="n">file_metadatas</span> <span class="o">=</span> <span class="p">[</span><span class="n">FileMetadata</span><span class="p">(</span><span class="n">pattern</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">size</span><span class="p">(</span><span class="n">pattern</span><span class="p">))]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dirs</span><span class="p">():</span> |
| <span class="n">prefix_or_dir</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_url_dirname</span><span class="p">(</span><span class="n">prefix_or_dir</span><span class="p">)</span> |
| <span class="n">file_metadatas</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_list</span><span class="p">(</span><span class="n">prefix_or_dir</span><span class="p">)</span> |
| |
| <span class="n">metadata_list</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">for</span> <span class="n">file_metadata</span> <span class="ow">in</span> <span class="n">file_metadatas</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">limit</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">metadata_list</span><span class="p">)</span> <span class="o">>=</span> <span class="n">limit</span><span class="p">:</span> |
| <span class="k">break</span> |
| <span class="k">if</span> <span class="n">fnmatch</span><span class="o">.</span><span class="n">fnmatch</span><span class="p">(</span><span class="n">file_metadata</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="n">pattern</span><span class="p">):</span> |
| <span class="n">metadata_list</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">file_metadata</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">MatchResult</span><span class="p">(</span><span class="n">pattern</span><span class="p">,</span> <span class="n">metadata_list</span><span class="p">)</span> |
| |
| <span class="n">exceptions</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">for</span> <span class="n">pattern</span><span class="p">,</span> <span class="n">limit</span> <span class="ow">in</span> <span class="nb">zip</span><span class="p">(</span><span class="n">patterns</span><span class="p">,</span> <span class="n">limits</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">result</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">_match</span><span class="p">(</span><span class="n">pattern</span><span class="p">,</span> <span class="n">limit</span><span class="p">))</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> <span class="c1"># pylint: disable=broad-except</span> |
| <span class="n">exceptions</span><span class="p">[</span><span class="n">pattern</span><span class="p">]</span> <span class="o">=</span> <span class="n">e</span> |
| |
| <span class="k">if</span> <span class="n">exceptions</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">BeamIOError</span><span class="p">(</span><span class="s2">"Match operation failed"</span><span class="p">,</span> <span class="n">exceptions</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">result</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.create"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.create">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">create</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">mime_type</span><span class="o">=</span><span class="s1">'application/octet-stream'</span><span class="p">,</span> |
| <span class="n">compression_type</span><span class="o">=</span><span class="n">CompressionTypes</span><span class="o">.</span><span class="n">AUTO</span><span class="p">):</span> |
| <span class="sd">"""Returns a write channel for the given file path.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> path: string path of the file object to be written to the system</span> |
| <span class="sd"> mime_type: MIME type to specify the type of content in the file object</span> |
| <span class="sd"> compression_type: Type of compression to be used for this object</span> |
| |
| <span class="sd"> Returns: file handle with a close function for the user to use</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.open"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.open">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">open</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">,</span> <span class="n">mime_type</span><span class="o">=</span><span class="s1">'application/octet-stream'</span><span class="p">,</span> |
| <span class="n">compression_type</span><span class="o">=</span><span class="n">CompressionTypes</span><span class="o">.</span><span class="n">AUTO</span><span class="p">):</span> |
| <span class="sd">"""Returns a read channel for the given file path.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> path: string path of the file object to be read</span> |
| <span class="sd"> mime_type: MIME type to specify the type of content in the file object</span> |
| <span class="sd"> compression_type: Type of compression to be used for this object</span> |
| |
| <span class="sd"> Returns: file handle with a close function for the user to use</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.copy"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.copy">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">copy</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source_file_names</span><span class="p">,</span> <span class="n">destination_file_names</span><span class="p">):</span> |
| <span class="sd">"""Recursively copy the file tree from the source to the destination</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> source_file_names: list of source file objects that needs to be copied</span> |
| <span class="sd"> destination_file_names: list of destination of the new object</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ``BeamIOError`` if any of the copy operations fail</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.rename"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.rename">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">rename</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source_file_names</span><span class="p">,</span> <span class="n">destination_file_names</span><span class="p">):</span> |
| <span class="sd">"""Rename the files at the source list to the destination list.</span> |
| <span class="sd"> Source and destination lists should be of the same size.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> source_file_names: List of file paths that need to be moved</span> |
| <span class="sd"> destination_file_names: List of destination_file_names for the files</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ``BeamIOError`` if any of the rename operations fail</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.exists"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.exists">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">exists</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">):</span> |
| <span class="sd">"""Check if the provided path exists on the FileSystem.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> path: string path that needs to be checked.</span> |
| |
| <span class="sd"> Returns: boolean flag indicating if path exists</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.size"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.size">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">size</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">):</span> |
| <span class="sd">"""Get size in bytes of a file on the FileSystem.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> path: string filepath of file.</span> |
| |
| <span class="sd"> Returns: int size of file according to the FileSystem.</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ``BeamIOError`` if path doesn't exist.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.last_updated"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.last_updated">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">last_updated</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">):</span> |
| <span class="sd">"""Get UNIX Epoch time in seconds on the FileSystem.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> path: string path of file.</span> |
| |
| <span class="sd"> Returns: float UNIX Epoch time</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ``BeamIOError`` if path doesn't exist.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.checksum"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.checksum">[docs]</a> <span class="k">def</span> <span class="nf">checksum</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">path</span><span class="p">):</span> |
| <span class="sd">"""Fetch checksum metadata of a file on the</span> |
| <span class="sd"> :class:`~apache_beam.io.filesystem.FileSystem`.</span> |
| |
| <span class="sd"> This operation returns checksum metadata as stored in the underlying</span> |
| <span class="sd"> FileSystem. It should not need to read file data to obtain this value.</span> |
| <span class="sd"> Checksum type and format are FileSystem dependent and are not compatible</span> |
| <span class="sd"> between FileSystems.</span> |
| <span class="sd"> FileSystem implementations may return file size if a checksum isn't</span> |
| <span class="sd"> available.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> path: string path of a file.</span> |
| |
| <span class="sd"> Returns: string containing checksum</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ``BeamIOError`` if path isn't a file or doesn't exist.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div> |
| |
| <div class="viewcode-block" id="FileSystem.delete"><a class="viewcode-back" href="../../../apache_beam.io.filesystem.html#apache_beam.io.filesystem.FileSystem.delete">[docs]</a> <span class="nd">@abc</span><span class="o">.</span><span class="n">abstractmethod</span> |
| <span class="k">def</span> <span class="nf">delete</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">paths</span><span class="p">):</span> |
| <span class="sd">"""Deletes files or directories at the provided paths.</span> |
| <span class="sd"> Directories will be deleted recursively.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> paths: list of paths that give the file objects to be deleted</span> |
| |
| <span class="sd"> Raises:</span> |
| <span class="sd"> ``BeamIOError`` if any of the delete operations fail</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span></div></div> |
| </pre></div> |
| |
| </div> |
| <div class="articleComments"> |
| |
| </div> |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright . |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| |
| |
| <script type="text/javascript"> |
| var DOCUMENTATION_OPTIONS = { |
| URL_ROOT:'../../../', |
| VERSION:'', |
| COLLAPSE_INDEX:false, |
| FILE_SUFFIX:'.html', |
| HAS_SOURCE: true, |
| SOURCELINK_SUFFIX: '.txt' |
| }; |
| </script> |
| <script type="text/javascript" src="../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../_static/doctools.js"></script> |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.StickyNav.enable(); |
| }); |
| </script> |
| |
| |
| </body> |
| </html> |