| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.options.pipeline_options — Apache Beam 2.47.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.47.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.options.pipeline_options</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.options.pipeline_options</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""Pipeline options obtained from command line parsing."""</span> |
| |
| <span class="c1"># pytype: skip-file</span> |
| |
| <span class="kn">import</span> <span class="nn">argparse</span> |
| <span class="kn">import</span> <span class="nn">json</span> |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Callable</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">List</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Type</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">TypeVar</span> |
| |
| <span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="kn">import</span> <span class="n">RuntimeValueProvider</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="kn">import</span> <span class="n">StaticValueProvider</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="kn">import</span> <span class="n">ValueProvider</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">HasDisplayData</span> |
| |
| <span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="s1">'PipelineOptions'</span><span class="p">,</span> |
| <span class="s1">'StandardOptions'</span><span class="p">,</span> |
| <span class="s1">'TypeOptions'</span><span class="p">,</span> |
| <span class="s1">'DirectOptions'</span><span class="p">,</span> |
| <span class="s1">'GoogleCloudOptions'</span><span class="p">,</span> |
| <span class="s1">'AzureOptions'</span><span class="p">,</span> |
| <span class="s1">'HadoopFileSystemOptions'</span><span class="p">,</span> |
| <span class="s1">'WorkerOptions'</span><span class="p">,</span> |
| <span class="s1">'DebugOptions'</span><span class="p">,</span> |
| <span class="s1">'ProfilingOptions'</span><span class="p">,</span> |
| <span class="s1">'SetupOptions'</span><span class="p">,</span> |
| <span class="s1">'TestOptions'</span><span class="p">,</span> |
| <span class="s1">'S3Options'</span> |
| <span class="p">]</span> |
| |
| <span class="n">PipelineOptionsT</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s1">'PipelineOptionsT'</span><span class="p">,</span> <span class="n">bound</span><span class="o">=</span><span class="s1">'PipelineOptions'</span><span class="p">)</span> |
| |
| <span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="c1"># Map defined with option names to flag names for boolean options</span> |
| <span class="c1"># that have a destination(dest) in parser.add_argument() different</span> |
| <span class="c1"># from the flag name and whose default value is `None`.</span> |
| <span class="n">_FLAG_THAT_SETS_FALSE_VALUE</span> <span class="o">=</span> <span class="p">{</span><span class="s1">'use_public_ips'</span><span class="p">:</span> <span class="s1">'no_use_public_ips'</span><span class="p">}</span> |
| |
| |
| <span class="k">def</span> <span class="nf">_static_value_provider_of</span><span class="p">(</span><span class="n">value_type</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">""""Helper function to plug a ValueProvider into argparse.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> value_type: the type of the value. Since the type param of argparse's</span> |
| <span class="sd"> add_argument will always be ValueProvider, we need to</span> |
| <span class="sd"> preserve the type of the actual value.</span> |
| <span class="sd"> Returns:</span> |
| <span class="sd"> A partially constructed StaticValueProvider in the form of a function.</span> |
| |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="nf">_f</span><span class="p">(</span><span class="n">value</span><span class="p">):</span> |
| <span class="n">_f</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="n">value_type</span><span class="o">.</span><span class="vm">__name__</span> |
| <span class="k">return</span> <span class="n">StaticValueProvider</span><span class="p">(</span><span class="n">value_type</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">_f</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_BeamArgumentParser</span><span class="p">(</span><span class="n">argparse</span><span class="o">.</span><span class="n">ArgumentParser</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""An ArgumentParser that supports ValueProvider options.</span> |
| |
| <span class="sd"> Example Usage::</span> |
| |
| <span class="sd"> class TemplateUserOptions(PipelineOptions):</span> |
| <span class="sd"> @classmethod</span> |
| |
| <span class="sd"> def _add_argparse_args(cls, parser):</span> |
| <span class="sd"> parser.add_value_provider_argument('--vp_arg1', default='start')</span> |
| <span class="sd"> parser.add_value_provider_argument('--vp_arg2')</span> |
| <span class="sd"> parser.add_argument('--non_vp_arg')</span> |
| |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="nf">add_value_provider_argument</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""ValueProvider arguments can be either of type keyword or positional.</span> |
| <span class="sd"> At runtime, even positional arguments will need to be supplied in the</span> |
| <span class="sd"> key/value form.</span> |
| <span class="sd"> """</span> |
| <span class="c1"># Extract the option name from positional argument ['pos_arg']</span> |
| <span class="k">assert</span> <span class="n">args</span> <span class="o">!=</span> <span class="p">()</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> <span class="o">>=</span> <span class="mi">1</span> |
| <span class="k">if</span> <span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="s1">'-'</span><span class="p">:</span> |
| <span class="n">option_name</span> <span class="o">=</span> <span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="k">if</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'nargs'</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> <span class="c1"># make them optionally templated</span> |
| <span class="n">kwargs</span><span class="p">[</span><span class="s1">'nargs'</span><span class="p">]</span> <span class="o">=</span> <span class="s1">'?'</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]</span> |
| <span class="n">option_name</span> <span class="o">=</span> <span class="p">[</span><span class="n">i</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'--'</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="n">args</span> <span class="k">if</span> <span class="n">i</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'--'</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span> |
| |
| <span class="c1"># reassign the type to make room for using</span> |
| <span class="c1"># StaticValueProvider as the type for add_argument</span> |
| <span class="n">value_type</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'type'</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">str</span> |
| <span class="n">kwargs</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">=</span> <span class="n">_static_value_provider_of</span><span class="p">(</span><span class="n">value_type</span><span class="p">)</span> |
| |
| <span class="c1"># reassign default to default_value to make room for using</span> |
| <span class="c1"># RuntimeValueProvider as the default for add_argument</span> |
| <span class="n">default_value</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'default'</span><span class="p">)</span> |
| <span class="n">kwargs</span><span class="p">[</span><span class="s1">'default'</span><span class="p">]</span> <span class="o">=</span> <span class="n">RuntimeValueProvider</span><span class="p">(</span> |
| <span class="n">option_name</span><span class="o">=</span><span class="n">option_name</span><span class="p">,</span> |
| <span class="n">value_type</span><span class="o">=</span><span class="n">value_type</span><span class="p">,</span> |
| <span class="n">default_value</span><span class="o">=</span><span class="n">default_value</span><span class="p">)</span> |
| |
| <span class="c1"># have add_argument do most of the work</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| |
| <span class="c1"># The argparse package by default tries to autocomplete option names. This</span> |
| <span class="c1"># results in an "ambiguous option" error from argparse when an unknown option</span> |
| <span class="c1"># matching multiple known ones are used. This suppresses that behavior.</span> |
| <span class="k">def</span> <span class="nf">error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">message</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">message</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'ambiguous option: '</span><span class="p">):</span> |
| <span class="k">return</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="n">message</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span> <span class="nc">_DictUnionAction</span><span class="p">(</span><span class="n">argparse</span><span class="o">.</span><span class="n">Action</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> argparse Action take union of json loads values. If a key is specified in more</span> |
| <span class="sd"> than one of the values, the last value takes precedence.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">parser</span><span class="p">,</span> <span class="n">namespace</span><span class="p">,</span> <span class="n">values</span><span class="p">,</span> <span class="n">option_string</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dest</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">dest</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="nb">setattr</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">dest</span><span class="p">,</span> <span class="p">{})</span> |
| <span class="nb">getattr</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">dest</span><span class="p">)</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">values</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="PipelineOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions">[docs]</a><span class="k">class</span> <span class="nc">PipelineOptions</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""This class and subclasses are used as containers for command line options.</span> |
| |
| <span class="sd"> These classes are wrappers over the standard argparse Python module</span> |
| <span class="sd"> (see https://docs.python.org/3/library/argparse.html). To define one option</span> |
| <span class="sd"> or a group of options, create a subclass from PipelineOptions.</span> |
| |
| <span class="sd"> Example Usage::</span> |
| |
| <span class="sd"> class XyzOptions(PipelineOptions):</span> |
| |
| <span class="sd"> @classmethod</span> |
| <span class="sd"> def _add_argparse_args(cls, parser):</span> |
| <span class="sd"> parser.add_argument('--abc', default='start')</span> |
| <span class="sd"> parser.add_argument('--xyz', default='end')</span> |
| |
| <span class="sd"> The arguments for the add_argument() method are exactly the ones</span> |
| <span class="sd"> described in the argparse public documentation.</span> |
| |
| <span class="sd"> Pipeline objects require an options object during initialization.</span> |
| <span class="sd"> This is obtained simply by initializing an options class as defined above.</span> |
| |
| <span class="sd"> Example Usage::</span> |
| |
| <span class="sd"> p = Pipeline(options=XyzOptions())</span> |
| <span class="sd"> if p.options.xyz == 'end':</span> |
| <span class="sd"> raise ValueError('Option xyz has an invalid value.')</span> |
| |
| <span class="sd"> Instances of PipelineOptions or any of its subclass have access to values</span> |
| <span class="sd"> defined by other PipelineOption subclasses (see get_all_options()), and</span> |
| <span class="sd"> can be converted to an instance of another PipelineOptions subclass</span> |
| <span class="sd"> (see view_as()). All views share the underlying data structure that stores</span> |
| <span class="sd"> option key-value pairs.</span> |
| |
| <span class="sd"> By default the options classes will use command line arguments to initialize</span> |
| <span class="sd"> the options.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">flags</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="c1"># type: (Optional[List[str]], **Any) -> None</span> |
| |
| <span class="w"> </span><span class="sd">"""Initialize an options class.</span> |
| |
| <span class="sd"> The initializer will traverse all subclasses, add all their argparse</span> |
| <span class="sd"> arguments and then parse the command line specified by flags or by default</span> |
| <span class="sd"> the one obtained from sys.argv.</span> |
| |
| <span class="sd"> The subclasses of PipelineOptions do not need to redefine __init__.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> flags: An iterable of command line arguments to be used. If not specified</span> |
| <span class="sd"> then sys.argv will be used as input for parsing arguments.</span> |
| |
| <span class="sd"> **kwargs: Add overrides for arguments passed in flags. For overrides</span> |
| <span class="sd"> of arguments, please pass the `option names` instead of</span> |
| <span class="sd"> flag names.</span> |
| <span class="sd"> Option names: These are defined as dest in the</span> |
| <span class="sd"> parser.add_argument() for each flag. Passing flags</span> |
| <span class="sd"> like {no_use_public_ips: True}, for which the dest is</span> |
| <span class="sd"> defined to a different flag name in the parser,</span> |
| <span class="sd"> would be discarded. Instead, pass the dest of</span> |
| <span class="sd"> the flag (dest of no_use_public_ips is use_public_ips).</span> |
| <span class="sd"> """</span> |
| <span class="c1"># Initializing logging configuration in case the user did not set it up.</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">basicConfig</span><span class="p">()</span> |
| |
| <span class="c1"># self._flags stores a list of not yet parsed arguments, typically,</span> |
| <span class="c1"># command-line flags. This list is shared across different views.</span> |
| <span class="c1"># See: view_as().</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_flags</span> <span class="o">=</span> <span class="n">flags</span> |
| |
| <span class="c1"># Build parser that will parse options recognized by the [sub]class of</span> |
| <span class="c1"># PipelineOptions whose object is being instantiated.</span> |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">_BeamArgumentParser</span><span class="p">()</span> |
| <span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">mro</span><span class="p">():</span> |
| <span class="k">if</span> <span class="bp">cls</span> <span class="o">==</span> <span class="n">PipelineOptions</span><span class="p">:</span> |
| <span class="k">break</span> |
| <span class="k">elif</span> <span class="s1">'_add_argparse_args'</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">:</span> |
| <span class="bp">cls</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span> <span class="c1"># type: ignore</span> |
| |
| <span class="c1"># The _visible_options attribute will contain options that were recognized</span> |
| <span class="c1"># by the parser.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="n">flags</span><span class="p">)</span> |
| |
| <span class="c1"># self._all_options is initialized with overrides to flag values,</span> |
| <span class="c1"># provided in kwargs, and will store key-value pairs for options recognized</span> |
| <span class="c1"># by current PipelineOptions [sub]class and its views that may be created.</span> |
| <span class="c1"># See: view_as().</span> |
| <span class="c1"># This dictionary is shared across different views, and is lazily updated</span> |
| <span class="c1"># as each new views are created.</span> |
| <span class="c1"># Users access this dictionary store via __getattr__ / __setattr__ methods.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span> <span class="o">=</span> <span class="n">kwargs</span> |
| |
| <span class="c1"># Initialize values of keys defined by this class.</span> |
| <span class="k">for</span> <span class="n">option_name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span> |
| <span class="c1"># Note that options specified in kwargs will not be overwritten.</span> |
| <span class="k">if</span> <span class="n">option_name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">option_name</span><span class="p">]</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span> <span class="n">option_name</span><span class="p">)</span> |
| |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="c1"># type: (_BeamArgumentParser) -> None</span> |
| <span class="c1"># Override this in subclasses to provide options.</span> |
| <span class="k">pass</span> |
| |
| <div class="viewcode-block" id="PipelineOptions.from_dictionary"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.from_dictionary">[docs]</a> <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">from_dictionary</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Returns a PipelineOptions from a dictionary of arguments.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> options: Dictionary of argument value pairs.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> A PipelineOptions object representing the given arguments.</span> |
| <span class="sd"> """</span> |
| <span class="n">flags</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">options</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="c1"># Note: If a boolean flag is True in the dictionary,</span> |
| <span class="c1"># implicitly the method assumes the boolean flag is</span> |
| <span class="c1"># specified as a command line argument. If the</span> |
| <span class="c1"># boolean flag is False, this method simply discards them.</span> |
| <span class="c1"># Eg: {no_auth: True} is similar to python your_file.py --no_auth</span> |
| <span class="c1"># {no_auth: False} is similar to python your_file.py.</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">bool</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">v</span><span class="p">:</span> |
| <span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'--</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">k</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">_FLAG_THAT_SETS_FALSE_VALUE</span><span class="p">:</span> |
| <span class="c1"># Capture overriding flags, which have a different dest</span> |
| <span class="c1"># from the flag name defined in the parser.add_argument</span> |
| <span class="c1"># Eg: no_use_public_ips, which has the dest=use_public_ips</span> |
| <span class="c1"># different from flag name</span> |
| <span class="n">flag_that_disables_the_option</span> <span class="o">=</span> <span class="p">(</span><span class="n">_FLAG_THAT_SETS_FALSE_VALUE</span><span class="p">[</span><span class="n">k</span><span class="p">])</span> |
| <span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'--</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">flag_that_disables_the_option</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="n">v</span><span class="p">:</span> |
| <span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">i</span><span class="p">))</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span> |
| <span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">v</span><span class="p">)))</span> |
| <span class="k">elif</span> <span class="n">v</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="c1"># Don't process None type args here, they will be treated</span> |
| <span class="c1"># as strings when parsed by BeamArgumentParser..</span> |
| <span class="n">logging</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s1">'Not setting flag with value None: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">k</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">))</span> |
| |
| <span class="k">return</span> <span class="bp">cls</span><span class="p">(</span><span class="n">flags</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="PipelineOptions.get_all_options"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.get_all_options">[docs]</a> <span class="k">def</span> <span class="nf">get_all_options</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">drop_default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">add_extra_args_fn</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable[[_BeamArgumentParser], None]]</span> |
| <span class="n">retain_unknown_options</span><span class="o">=</span><span class="kc">False</span> |
| <span class="p">):</span> |
| <span class="c1"># type: (...) -> Dict[str, Any]</span> |
| |
| <span class="w"> </span><span class="sd">"""Returns a dictionary of all defined arguments.</span> |
| |
| <span class="sd"> Returns a dictionary of all defined arguments (arguments that are defined in</span> |
| <span class="sd"> any subclass of PipelineOptions) into a dictionary.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> drop_default: If set to true, options that are equal to their default</span> |
| <span class="sd"> values, are not returned as part of the result dictionary.</span> |
| <span class="sd"> add_extra_args_fn: Callback to populate additional arguments, can be used</span> |
| <span class="sd"> by runner to supply otherwise unknown args.</span> |
| <span class="sd"> retain_unknown_options: If set to true, options not recognized by any</span> |
| <span class="sd"> known pipeline options class will still be included in the result. If</span> |
| <span class="sd"> set to false, they will be discarded.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> Dictionary of all args and values.</span> |
| <span class="sd"> """</span> |
| |
| <span class="c1"># TODO(https://github.com/apache/beam/issues/18197): PipelineOption</span> |
| <span class="c1"># sub-classes in the main session might be repeated. Pick last unique</span> |
| <span class="c1"># instance of each subclass to avoid conflicts.</span> |
| <span class="n">subset</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">_BeamArgumentParser</span><span class="p">()</span> |
| <span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">PipelineOptions</span><span class="o">.</span><span class="n">__subclasses__</span><span class="p">():</span> |
| <span class="n">subset</span><span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="bp">cls</span><span class="p">)]</span> <span class="o">=</span> <span class="bp">cls</span> |
| <span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">subset</span><span class="o">.</span><span class="n">values</span><span class="p">():</span> |
| <span class="bp">cls</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span> <span class="c1"># pylint: disable=protected-access</span> |
| <span class="k">if</span> <span class="n">add_extra_args_fn</span><span class="p">:</span> |
| <span class="n">add_extra_args_fn</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span> |
| |
| <span class="n">known_args</span><span class="p">,</span> <span class="n">unknown_args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">retain_unknown_options</span><span class="p">:</span> |
| <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="k">while</span> <span class="n">i</span> <span class="o"><</span> <span class="nb">len</span><span class="p">(</span><span class="n">unknown_args</span><span class="p">):</span> |
| <span class="c1"># Treat all unary flags as booleans, and all binary argument values as</span> |
| <span class="c1"># strings.</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'-'</span><span class="p">):</span> |
| <span class="n">i</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">continue</span> |
| <span class="k">if</span> <span class="n">i</span> <span class="o">+</span> <span class="mi">1</span> <span class="o">>=</span> <span class="nb">len</span><span class="p">(</span><span class="n">unknown_args</span><span class="p">)</span> <span class="ow">or</span> <span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span> <span class="o">+</span> <span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'-'</span><span class="p">):</span> |
| <span class="n">split</span> <span class="o">=</span> <span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'='</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">split</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">],</span> <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="n">split</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">)</span> |
| <span class="n">i</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">elif</span> <span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'--'</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">],</span> <span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">)</span> |
| <span class="n">i</span> <span class="o">+=</span> <span class="mi">2</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># skip all binary flags used with '-' and not '--'.</span> |
| <span class="c1"># ex: using -f instead of --f (or --flexrs_goal) will prevent</span> |
| <span class="c1"># argument validation before job submission and can be incorrectly</span> |
| <span class="c1"># submitted to job.</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span> |
| <span class="s2">"Discarding flag </span><span class="si">%s</span><span class="s2">, single dash flags are not allowed."</span><span class="p">,</span> |
| <span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">])</span> |
| <span class="n">i</span> <span class="o">+=</span> <span class="mi">2</span> |
| <span class="k">continue</span> |
| <span class="n">parsed_args</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">unknown_args</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">"Discarding unparseable args: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">unknown_args</span><span class="p">)</span> |
| <span class="n">parsed_args</span> <span class="o">=</span> <span class="n">known_args</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="nb">vars</span><span class="p">(</span><span class="n">parsed_args</span><span class="p">)</span> |
| |
| <span class="n">overrides</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span> |
| <span class="c1"># Apply the overrides if any</span> |
| <span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">result</span><span class="p">):</span> |
| <span class="n">overrides</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">k</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span> |
| <span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> |
| <span class="k">if</span> <span class="p">(</span><span class="n">drop_default</span> <span class="ow">and</span> <span class="n">parser</span><span class="o">.</span><span class="n">get_default</span><span class="p">(</span><span class="n">k</span><span class="p">)</span> <span class="o">==</span> <span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> <span class="ow">and</span> |
| <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">parser</span><span class="o">.</span><span class="n">get_default</span><span class="p">(</span><span class="n">k</span><span class="p">),</span> <span class="n">ValueProvider</span><span class="p">)):</span> |
| <span class="k">del</span> <span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="n">overrides</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">retain_unknown_options</span><span class="p">:</span> |
| <span class="n">result</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">overrides</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">"Discarding invalid overrides: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">overrides</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">result</span></div> |
| |
| <div class="viewcode-block" id="PipelineOptions.display_data"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_all_options</span><span class="p">(</span><span class="n">drop_default</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">retain_unknown_options</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="PipelineOptions.view_as"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.view_as">[docs]</a> <span class="k">def</span> <span class="nf">view_as</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="bp">cls</span><span class="p">):</span> |
| <span class="c1"># type: (Type[PipelineOptionsT]) -> PipelineOptionsT</span> |
| |
| <span class="w"> </span><span class="sd">"""Returns a view of current object as provided PipelineOption subclass.</span> |
| |
| <span class="sd"> Example Usage::</span> |
| |
| <span class="sd"> options = PipelineOptions(['--runner', 'Direct', '--streaming'])</span> |
| <span class="sd"> standard_options = options.view_as(StandardOptions)</span> |
| <span class="sd"> if standard_options.streaming:</span> |
| <span class="sd"> # ... start a streaming job ...</span> |
| |
| <span class="sd"> Note that options objects may have multiple views, and modifications</span> |
| <span class="sd"> of values in any view-object will apply to current object and other</span> |
| <span class="sd"> view-objects.</span> |
| |
| <span class="sd"> Args:</span> |
| <span class="sd"> cls: PipelineOptions class or any of its subclasses.</span> |
| |
| <span class="sd"> Returns:</span> |
| <span class="sd"> An instance of cls that is intitialized using options contained in current</span> |
| <span class="sd"> object.</span> |
| |
| <span class="sd"> """</span> |
| <span class="n">view</span> <span class="o">=</span> <span class="bp">cls</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span> |
| |
| <span class="k">for</span> <span class="n">option_name</span> <span class="ow">in</span> <span class="n">view</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span> |
| <span class="c1"># Initialize values of keys defined by a cls.</span> |
| <span class="c1">#</span> |
| <span class="c1"># Note that we do initialization only once per key to make sure that</span> |
| <span class="c1"># values in _all_options dict are not-recreated with each new view.</span> |
| <span class="c1"># This is important to make sure that values of multi-options keys are</span> |
| <span class="c1"># backed by the same list across multiple views, and that any overrides of</span> |
| <span class="c1"># pipeline options already stored in _all_options are preserved.</span> |
| <span class="k">if</span> <span class="n">option_name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">option_name</span><span class="p">]</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span> |
| <span class="n">view</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span> <span class="n">option_name</span><span class="p">)</span> |
| <span class="c1"># Note that views will still store _all_options of the source object.</span> |
| <span class="n">view</span><span class="o">.</span><span class="n">_all_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span> |
| <span class="k">return</span> <span class="n">view</span></div> |
| |
| <span class="k">def</span> <span class="nf">_visible_option_list</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> List[str]</span> |
| <span class="k">return</span> <span class="nb">sorted</span><span class="p">(</span> |
| <span class="n">option</span> <span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="nb">dir</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">)</span> <span class="k">if</span> <span class="n">option</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="s1">'_'</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="fm">__dir__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> List[str]</span> |
| <span class="k">return</span> <span class="nb">sorted</span><span class="p">(</span> |
| <span class="nb">dir</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> <span class="o">+</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">)</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">())</span> |
| |
| <span class="k">def</span> <span class="fm">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">):</span> |
| <span class="c1"># Special methods which may be accessed before the object is</span> |
| <span class="c1"># fully constructed (e.g. in unpickling).</span> |
| <span class="k">if</span> <span class="n">name</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> <span class="o">==</span> <span class="n">name</span><span class="p">[</span><span class="o">-</span><span class="mi">2</span><span class="p">:]</span> <span class="o">==</span> <span class="s1">'__'</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span> |
| <span class="s2">"'</span><span class="si">%s</span><span class="s2">' object has no attribute '</span><span class="si">%s</span><span class="s2">'"</span> <span class="o">%</span> <span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="fm">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">'_flags'</span><span class="p">,</span> <span class="s1">'_all_options'</span><span class="p">,</span> <span class="s1">'_visible_options'</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__setattr__</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span> |
| <span class="s2">"'</span><span class="si">%s</span><span class="s2">' object has no attribute '</span><span class="si">%s</span><span class="s2">'"</span> <span class="o">%</span> <span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="fm">__str__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="s1">'</span><span class="si">%s</span><span class="s1">(</span><span class="si">%s</span><span class="s1">)'</span> <span class="o">%</span> <span class="p">(</span> |
| <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> |
| <span class="s1">', '</span><span class="o">.</span><span class="n">join</span><span class="p">(</span> |
| <span class="s1">'</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">option</span><span class="p">,</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">option</span><span class="p">))</span> |
| <span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">()))</span></div> |
| |
| |
| <div class="viewcode-block" id="StandardOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.StandardOptions">[docs]</a><span class="k">class</span> <span class="nc">StandardOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| |
| <span class="n">DEFAULT_RUNNER</span> <span class="o">=</span> <span class="s1">'DirectRunner'</span> |
| |
| <span class="n">ALL_KNOWN_RUNNERS</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="s1">'apache_beam.runners.dataflow.dataflow_runner.DataflowRunner'</span><span class="p">,</span> |
| <span class="s1">'apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner'</span><span class="p">,</span> |
| <span class="s1">'apache_beam.runners.direct.direct_runner.DirectRunner'</span><span class="p">,</span> |
| <span class="s1">'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner'</span><span class="p">,</span> |
| <span class="s1">'apache_beam.runners.interactive.interactive_runner.InteractiveRunner'</span><span class="p">,</span> |
| <span class="s1">'apache_beam.runners.portability.flink_runner.FlinkRunner'</span><span class="p">,</span> |
| <span class="s1">'apache_beam.runners.portability.portable_runner.PortableRunner'</span><span class="p">,</span> |
| <span class="s1">'apache_beam.runners.portability.spark_runner.SparkRunner'</span><span class="p">,</span> |
| <span class="s1">'apache_beam.runners.test.TestDirectRunner'</span><span class="p">,</span> |
| <span class="s1">'apache_beam.runners.test.TestDataflowRunner'</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| <span class="n">KNOWN_RUNNER_NAMES</span> <span class="o">=</span> <span class="p">[</span><span class="n">path</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'.'</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span> <span class="k">for</span> <span class="n">path</span> <span class="ow">in</span> <span class="n">ALL_KNOWN_RUNNERS</span><span class="p">]</span> |
| |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--runner'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Pipeline runner used to execute the workflow. Valid values are '</span> |
| <span class="s1">'one of </span><span class="si">%s</span><span class="s1">, or the fully qualified name of a PipelineRunner '</span> |
| <span class="s1">'subclass. If unspecified, defaults to </span><span class="si">%s</span><span class="s1">.'</span> <span class="o">%</span> |
| <span class="p">(</span><span class="s1">', '</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">KNOWN_RUNNER_NAMES</span><span class="p">),</span> <span class="bp">cls</span><span class="o">.</span><span class="n">DEFAULT_RUNNER</span><span class="p">)))</span> |
| <span class="c1"># Whether to enable streaming mode.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--streaming'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Whether to enable streaming mode.'</span><span class="p">)</span> |
| |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--resource_hint'</span><span class="p">,</span> |
| <span class="s1">'--resource_hints'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'resource_hints'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="p">[],</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Resource hint to set in the pipeline execution environment.'</span> |
| <span class="s1">'Hints specified via this option override hints specified '</span> |
| <span class="s1">'at transform level. Interpretation of hints is defined by '</span> |
| <span class="s1">'Beam runners.'</span><span class="p">))</span></div> |
| |
| |
| <span class="k">class</span> <span class="nc">CrossLanguageOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--beam_services'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="p">{},</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'For convienience, Beam provides the ability to automatically '</span> |
| <span class="s1">'download and start various services (such as expansion services) '</span> |
| <span class="s1">'used at pipeline construction and execution. These services are '</span> |
| <span class="s1">'identified by gradle target. This option provides the ability to '</span> |
| <span class="s1">'use pre-started services or non-default pre-existing artifacts to '</span> |
| <span class="s1">'start the given service. '</span> |
| <span class="s1">'Should be a json mapping of gradle build targets to pre-built '</span> |
| <span class="s1">'artifacts (e.g. jar files) expansion endpoints (e.g. host:port).'</span><span class="p">))</span> |
| |
| |
| <span class="k">def</span> <span class="nf">additional_option_ptransform_fn</span><span class="p">():</span> |
| <span class="n">beam</span><span class="o">.</span><span class="n">transforms</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn_typehints_enabled</span> <span class="o">=</span> <span class="kc">True</span> |
| |
| |
| <span class="c1"># Optional type checks that aren't enabled by default.</span> |
| <span class="n">additional_type_checks</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="s1">'ptransform_fn'</span><span class="p">:</span> <span class="n">additional_option_ptransform_fn</span><span class="p">,</span> |
| <span class="p">}</span> <span class="c1"># type: Dict[str, Callable[[], None]]</span> |
| |
| |
| <span class="k">def</span> <span class="nf">enable_all_additional_type_checks</span><span class="p">():</span> |
| <span class="w"> </span><span class="sd">"""Same as passing --type_check_additional=all."""</span> |
| <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">additional_type_checks</span><span class="o">.</span><span class="n">values</span><span class="p">():</span> |
| <span class="n">f</span><span class="p">()</span> |
| |
| |
| <div class="viewcode-block" id="TypeOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TypeOptions">[docs]</a><span class="k">class</span> <span class="nc">TypeOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="c1"># TODO(laolu): Add a type inferencing option here once implemented.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--type_check_strictness'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">'DEFAULT_TO_ANY'</span><span class="p">,</span> |
| <span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">'ALL_REQUIRED'</span><span class="p">,</span> <span class="s1">'DEFAULT_TO_ANY'</span><span class="p">],</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The level of exhaustive manual type-hint '</span> |
| <span class="s1">'annotation required'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--type_check_additional'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">''</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Comma separated list of additional type checking features to '</span> |
| <span class="s1">'enable. Options: all, ptransform_fn. For details see:'</span> |
| <span class="s1">'https://beam.apache.org/documentation/sdks/python-type-safety/'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--no_pipeline_type_check'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'pipeline_type_check'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_false'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Disable type checking at pipeline construction '</span> |
| <span class="s1">'time'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--runtime_type_check'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Enable type checking at pipeline execution '</span> |
| <span class="s1">'time. NOTE: only supported with the '</span> |
| <span class="s1">'DirectRunner'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--performance_runtime_type_check'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Enable faster type checking via sampling at pipeline execution '</span> |
| <span class="s1">'time. NOTE: only supported with portable runners '</span> |
| <span class="s1">'(including the DirectRunner)'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--allow_non_deterministic_key_coders'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Use non-deterministic coders (such as pickling) for key-grouping '</span> |
| <span class="s1">'operations such as GropuByKey. This is unsafe, as runners may group '</span> |
| <span class="s1">'keys based on their encoded bytes, but is available for backwards '</span> |
| <span class="s1">'compatibility. See BEAM-11719.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--allow_unsafe_triggers'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Allow the use of unsafe triggers. Unsafe triggers have the '</span> |
| <span class="s1">'potential to cause data loss due to finishing and/or never having '</span> |
| <span class="s1">'their condition met. Some operations, such as GroupByKey, disallow '</span> |
| <span class="s1">'this. This exists for cases where such loss is acceptable and for '</span> |
| <span class="s1">'backwards compatibility. See BEAM-9487.'</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="TypeOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TypeOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_validator</span><span class="p">):</span> |
| <span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="n">beam</span><span class="o">.</span><span class="n">version</span><span class="o">.</span><span class="n">__version__</span> <span class="o">>=</span> <span class="s1">'3'</span><span class="p">:</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span> |
| <span class="s1">'Update --type_check_additional default to include all '</span> |
| <span class="s1">'available additional checks at Beam 3.0 release time.'</span><span class="p">)</span> |
| <span class="n">keys</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">type_check_additional</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">','</span><span class="p">)</span> |
| |
| <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">keys</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">key</span><span class="p">:</span> |
| <span class="k">continue</span> |
| <span class="k">elif</span> <span class="n">key</span> <span class="o">==</span> <span class="s1">'all'</span><span class="p">:</span> |
| <span class="n">enable_all_additional_type_checks</span><span class="p">()</span> |
| <span class="k">elif</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">additional_type_checks</span><span class="p">:</span> |
| <span class="n">additional_type_checks</span><span class="p">[</span><span class="n">key</span><span class="p">]()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'Unrecognized --type_check_additional feature: </span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">key</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">errors</span></div></div> |
| |
| |
| <div class="viewcode-block" id="DirectOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DirectOptions">[docs]</a><span class="k">class</span> <span class="nc">DirectOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""DirectRunner-specific execution options."""</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--no_direct_runner_use_stacked_bundle'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_false'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'direct_runner_use_stacked_bundle'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'DirectRunner uses stacked WindowedValues within a Bundle for '</span> |
| <span class="s1">'memory optimization. Set --no_direct_runner_use_stacked_bundle to '</span> |
| <span class="s1">'avoid it.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--direct_runner_bundle_repeat'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'replay every bundle this many extra times, for profiling'</span> |
| <span class="s1">'and debugging'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--direct_num_workers'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'number of parallel running workers.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--direct_running_mode'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">'in_memory'</span><span class="p">,</span> |
| <span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">'in_memory'</span><span class="p">,</span> <span class="s1">'multi_threading'</span><span class="p">,</span> <span class="s1">'multi_processing'</span><span class="p">],</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Workers running environment.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--direct_embed_docker_python'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'direct_embed_docker_python'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'DirectRunner uses the embedded Python environment when '</span> |
| <span class="s1">'the default Python docker environment is specified.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--direct_test_splits'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="p">{},</span> |
| <span class="nb">type</span><span class="o">=</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Split test configuration of the json form '</span> |
| <span class="s1">'{"step_name": {"timings": [...], "fractions": [...]}, ...} '</span> |
| <span class="s1">'where step_name is the name of a step controlling the stage to which '</span> |
| <span class="s1">'splits will be sent, timings is a list of floating-point times '</span> |
| <span class="s1">'(in seconds) at which the split requests will be sent, and '</span> |
| <span class="s1">'fractions is a corresponding list of floating points to use in the '</span> |
| <span class="s1">'split requests themselves.'</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="GoogleCloudOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions">[docs]</a><span class="k">class</span> <span class="nc">GoogleCloudOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Google Cloud Dataflow service execution options."""</span> |
| |
| <span class="n">BIGQUERY_API_SERVICE</span> <span class="o">=</span> <span class="s1">'bigquery.googleapis.com'</span> |
| <span class="n">COMPUTE_API_SERVICE</span> <span class="o">=</span> <span class="s1">'compute.googleapis.com'</span> |
| <span class="n">STORAGE_API_SERVICE</span> <span class="o">=</span> <span class="s1">'storage.googleapis.com'</span> |
| <span class="n">DATAFLOW_ENDPOINT</span> <span class="o">=</span> <span class="s1">'https://dataflow.googleapis.com'</span> |
| <span class="n">OAUTH_SCOPES</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="s1">'https://www.googleapis.com/auth/bigquery'</span><span class="p">,</span> |
| <span class="s1">'https://www.googleapis.com/auth/cloud-platform'</span><span class="p">,</span> |
| <span class="s1">'https://www.googleapis.com/auth/devstorage.full_control'</span><span class="p">,</span> |
| <span class="s1">'https://www.googleapis.com/auth/userinfo.email'</span><span class="p">,</span> |
| <span class="s1">'https://www.googleapis.com/auth/datastore'</span><span class="p">,</span> |
| <span class="s1">'https://www.googleapis.com/auth/spanner.admin'</span><span class="p">,</span> |
| <span class="s1">'https://www.googleapis.com/auth/spanner.data'</span> |
| <span class="p">]</span> |
| |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--dataflow_endpoint'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">DATAFLOW_ENDPOINT</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'The URL for the Dataflow API. If not set, the default public URL '</span> |
| <span class="s1">'will be used.'</span><span class="p">))</span> |
| <span class="c1"># Remote execution must check that this option is not None.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--project'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Name of the Cloud project owning the Dataflow '</span> |
| <span class="s1">'job.'</span><span class="p">)</span> |
| <span class="c1"># Remote execution must check that this option is not None.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--job_name'</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="s1">'Name of the Cloud Dataflow job.'</span><span class="p">)</span> |
| <span class="c1"># Remote execution must check that this option is not None.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--staging_location'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'GCS path for staging code packages needed by '</span> |
| <span class="s1">'workers.'</span><span class="p">)</span> |
| <span class="c1"># Remote execution must check that this option is not None.</span> |
| <span class="c1"># If staging_location is not set, it defaults to temp_location.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--temp_location'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'GCS path for saving temporary workflow jobs.'</span><span class="p">)</span> |
| <span class="c1"># The Google Compute Engine region for creating Dataflow jobs. See</span> |
| <span class="c1"># https://cloud.google.com/compute/docs/regions-zones/regions-zones for a</span> |
| <span class="c1"># list of valid options.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--region'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The Google Compute Engine region for creating '</span> |
| <span class="s1">'Dataflow job.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--service_account_email'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Identity to run virtual machines as.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--no_auth'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'no_auth'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Skips authorizing credentials with Google Cloud.'</span><span class="p">)</span> |
| <span class="c1"># Option to run templated pipelines</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--template_location'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Save job to specified local or GCS location.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--label'</span><span class="p">,</span> |
| <span class="s1">'--labels'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'labels'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Labels to be applied to this Dataflow job. '</span> |
| <span class="s1">'Labels are key value pairs separated by = '</span> |
| <span class="s1">'(e.g. --label key=value) or '</span> |
| <span class="s1">'(--labels=</span><span class="se">\'</span><span class="s1">{ "key": "value", "mass": "1_3kg", "count": "3" }</span><span class="se">\'</span><span class="s1">).'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--update'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Update an existing streaming Cloud Dataflow job. '</span> |
| <span class="s1">'Experimental. '</span> |
| <span class="s1">'See https://cloud.google.com/dataflow/docs/guides/'</span> |
| <span class="s1">'updating-a-pipeline'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--transform_name_mapping'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The transform mapping that maps the named '</span> |
| <span class="s1">'transforms in your prior pipeline code to names '</span> |
| <span class="s1">'in your replacement pipeline code.'</span> |
| <span class="s1">'Experimental. '</span> |
| <span class="s1">'See https://cloud.google.com/dataflow/docs/guides/'</span> |
| <span class="s1">'updating-a-pipeline'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--enable_streaming_engine'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Enable Windmill Service for this Dataflow job. '</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--dataflow_kms_key'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Set a Google Cloud KMS key name to be used in '</span> |
| <span class="s1">'Dataflow state operations (GBK, Streaming).'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--create_from_snapshot'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The snapshot from which the job should be created.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--flexrs_goal'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">'COST_OPTIMIZED'</span><span class="p">,</span> <span class="s1">'SPEED_OPTIMIZED'</span><span class="p">],</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Set the Flexible Resource Scheduling mode'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--dataflow_service_option'</span><span class="p">,</span> |
| <span class="s1">'--dataflow_service_options'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'dataflow_service_options'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Options to configure the Dataflow service. These '</span> |
| <span class="s1">'options decouple service side feature availbility '</span> |
| <span class="s1">'from the Apache Beam release cycle.'</span> |
| <span class="s1">'Note: If set programmatically, must be set as a '</span> |
| <span class="s1">'list of strings'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--enable_hot_key_logging'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'When true, will enable the direct logging of any detected hot '</span> |
| <span class="s1">'keys into Cloud Logging. Warning: this will log the literal key as an '</span> |
| <span class="s1">'unobfuscated string.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--enable_artifact_caching'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'When true, artifacts will be cached across job submissions in '</span> |
| <span class="s1">'the GCS staging bucket'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--impersonate_service_account'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'All API requests will be made as the given service account or '</span> |
| <span class="s1">'target service account in an impersonation delegation chain '</span> |
| <span class="s1">'instead of the currently selected account. You can specify '</span> |
| <span class="s1">'either a single service account as the impersonator, or a '</span> |
| <span class="s1">'comma-separated list of service accounts to create an '</span> |
| <span class="s1">'impersonation delegation chain.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--gcp_oauth_scope'</span><span class="p">,</span> |
| <span class="s1">'--gcp_oauth_scopes'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'gcp_oauth_scopes'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">OAUTH_SCOPES</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Controls the OAuth scopes that will be requested when creating '</span> |
| <span class="s1">'GCP credentials. Note: If set programmatically, must be set as a '</span> |
| <span class="s1">'list of strings'</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">_create_default_gcs_bucket</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">gcsio</span> |
| <span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span> |
| <span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s1">'Unable to create default GCS bucket.'</span><span class="p">)</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| <span class="n">bucket</span> <span class="o">=</span> <span class="n">gcsio</span><span class="o">.</span><span class="n">get_or_create_default_gcs_bucket</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">bucket</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s1">'gs://</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">bucket</span><span class="o">.</span><span class="n">id</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| |
| <div class="viewcode-block" id="GoogleCloudOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span> |
| <span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="n">validator</span><span class="o">.</span><span class="n">is_service_runner</span><span class="p">():</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_cloud_options</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> |
| |
| <span class="c1"># Validating temp_location, or adding a default if there are issues</span> |
| <span class="n">temp_location_errors</span> <span class="o">=</span> <span class="n">validator</span><span class="o">.</span><span class="n">validate_gcs_path</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'temp_location'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">temp_location_errors</span><span class="p">:</span> |
| <span class="n">default_bucket</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_create_default_gcs_bucket</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">default_bucket</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">temp_location_errors</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="nb">setattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'temp_location'</span><span class="p">,</span> <span class="n">default_bucket</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'staging_location'</span><span class="p">,</span> |
| <span class="kc">None</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'temp_location'</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_gcs_path</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'staging_location'</span><span class="p">))</span> |
| |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">DebugOptions</span><span class="p">)</span><span class="o">.</span><span class="n">dataflow_job_file</span><span class="p">:</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">template_location</span><span class="p">:</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span> |
| <span class="s1">'--dataflow_job_file and --template_location '</span> |
| <span class="s1">'are mutually exclusive.'</span><span class="p">)</span> |
| |
| <span class="c1"># Validate that dataflow_service_options is a list</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_service_options</span><span class="p">:</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span> |
| <span class="n">validator</span><span class="o">.</span><span class="n">validate_repeatable_argument_passed_as_list</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="s1">'dataflow_service_options'</span><span class="p">))</span> |
| |
| <span class="k">return</span> <span class="n">errors</span></div></div> |
| |
| |
| <div class="viewcode-block" id="AzureOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.AzureOptions">[docs]</a><span class="k">class</span> <span class="nc">AzureOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Azure Blob Storage options."""</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--azure_connection_string'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Connection string of the Azure Blob Storage Account.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--blob_service_endpoint'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'URL of the Azure Blob Storage Account.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--azure_managed_identity_client_id'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Client ID of a user-assigned managed identity.'</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="AzureOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.AzureOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span> |
| <span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">azure_connection_string</span><span class="p">:</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">blob_service_endpoint</span><span class="p">:</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span> |
| <span class="s1">'--azure_connection_string and '</span> |
| <span class="s1">'--blob_service_endpoint are mutually exclusive.'</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">errors</span></div></div> |
| |
| |
| <div class="viewcode-block" id="HadoopFileSystemOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.HadoopFileSystemOptions">[docs]</a><span class="k">class</span> <span class="nc">HadoopFileSystemOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""``HadoopFileSystem`` connection options."""</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--hdfs_host'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">'Hostname or address of the HDFS namenode.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--hdfs_port'</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">'Port of the HDFS namenode.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--hdfs_user'</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">'HDFS username to use.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--hdfs_full_urls'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'If set, URLs will be parsed as "hdfs://server/path/...", instead '</span> |
| <span class="s1">'of "hdfs://path/...". The "server" part will be unused (use '</span> |
| <span class="s1">'--hdfs_host and --hdfs_port).'</span><span class="p">))</span> |
| |
| <div class="viewcode-block" id="HadoopFileSystemOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.HadoopFileSystemOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span> |
| <span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_optional_argument_positive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'port'</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">errors</span></div></div> |
| |
| |
| <span class="c1"># Command line options controlling the worker pool configuration.</span> |
| <span class="c1"># TODO(silviuc): Update description when autoscaling options are in.</span> |
| <div class="viewcode-block" id="WorkerOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.WorkerOptions">[docs]</a><span class="k">class</span> <span class="nc">WorkerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Worker pool configuration options."""</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--num_workers'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Number of workers to use when executing the Dataflow job. If not '</span> |
| <span class="s1">'set, the Dataflow service will use a reasonable default.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--max_num_workers'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Maximum number of workers to use when executing the Dataflow job.'</span> |
| <span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--autoscaling_algorithm'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span> |
| <span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">'NONE'</span><span class="p">,</span> <span class="s1">'THROUGHPUT_BASED'</span><span class="p">],</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># Meaning unset, distinct from 'NONE' meaning don't scale</span> |
| <span class="n">help</span><span class="o">=</span> |
| <span class="p">(</span><span class="s1">'If and how to autoscale the workerpool.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--worker_machine_type'</span><span class="p">,</span> |
| <span class="s1">'--machine_type'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'machine_type'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Machine type to create Dataflow worker VMs as. See '</span> |
| <span class="s1">'https://cloud.google.com/compute/docs/machine-types '</span> |
| <span class="s1">'for a list of valid options. If not set, '</span> |
| <span class="s1">'the Dataflow service will choose a reasonable '</span> |
| <span class="s1">'default.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--disk_size_gb'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Remote worker disk size, in gigabytes, or 0 to use the default '</span> |
| <span class="s1">'size. If not set, the Dataflow service will use a reasonable '</span> |
| <span class="s1">'default.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--worker_disk_type'</span><span class="p">,</span> |
| <span class="s1">'--disk_type'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'disk_type'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">'Specifies what type of persistent disk should be used.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--worker_region'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'The Compute Engine region (https://cloud.google.com/compute/docs/'</span> |
| <span class="s1">'regions-zones/regions-zones) in which worker processing should '</span> |
| <span class="s1">'occur, e.g. "us-west1". Mutually exclusive with worker_zone. If '</span> |
| <span class="s1">'neither worker_region nor worker_zone is specified, default to '</span> |
| <span class="s1">'same value as --region.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--worker_zone'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'The Compute Engine zone (https://cloud.google.com/compute/docs/'</span> |
| <span class="s1">'regions-zones/regions-zones) in which worker processing should '</span> |
| <span class="s1">'occur, e.g. "us-west1-a". Mutually exclusive with worker_region. '</span> |
| <span class="s1">'If neither worker_region nor worker_zone is specified, the '</span> |
| <span class="s1">'Dataflow service will choose a zone in --region based on '</span> |
| <span class="s1">'available capacity.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--zone'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'GCE availability zone for launching workers. Default is up to the '</span> |
| <span class="s1">'Dataflow service. This flag is deprecated, and will be replaced '</span> |
| <span class="s1">'by worker_zone.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--network'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'GCE network for launching workers. Default is up to the Dataflow '</span> |
| <span class="s1">'service.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--subnetwork'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'GCE subnetwork for launching workers. Default is up to the '</span> |
| <span class="s1">'Dataflow service. Expected format is '</span> |
| <span class="s1">'regions/REGION/subnetworks/SUBNETWORK or the fully qualified '</span> |
| <span class="s1">'subnetwork name. For more information, see '</span> |
| <span class="s1">'https://cloud.google.com/compute/docs/vpc/'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--worker_harness_container_image'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Docker registry location of container image to use for the '</span> |
| <span class="s1">'worker harness. If not set, an appropriate approved Google Cloud '</span> |
| <span class="s1">'Dataflow image will be used based on the version of the '</span> |
| <span class="s1">'SDK. Note: This flag is deprecated and only supports '</span> |
| <span class="s1">'approved Google Cloud Dataflow container images. To provide a '</span> |
| <span class="s1">'custom container image, use sdk_container_image instead.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--sdk_container_image'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Docker registry location of container image to use for the '</span> |
| <span class="s1">'worker harness. If not set, an appropriate approved Google Cloud '</span> |
| <span class="s1">'Dataflow image will be used based on the version of the '</span> |
| <span class="s1">'SDK. If set for a non-portable pipeline, only official '</span> |
| <span class="s1">'Google Cloud Dataflow container images may be used here.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--sdk_harness_container_image_overrides'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Overrides for SDK harness container images. Could be for the '</span> |
| <span class="s1">'local SDK or for a remote SDK that pipeline has to support due '</span> |
| <span class="s1">'to a cross-language transform. Each entry consist of two values '</span> |
| <span class="s1">'separated by a comma where first value gives a regex to '</span> |
| <span class="s1">'identify the container image to override and the second value '</span> |
| <span class="s1">'gives the replacement container image.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--default_sdk_harness_log_level'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Controls the default log level of all loggers without a log level '</span> |
| <span class="s1">'override. Values can be either a labeled level or a number '</span> |
| <span class="s1">'(See https://docs.python.org/3/library/logging.html#levels). '</span> |
| <span class="s1">'Default log level is INFO.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--sdk_harness_log_level_overrides'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="n">_DictUnionAction</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Controls the log levels for specifically named loggers. The '</span> |
| <span class="s1">'expected format is a json string: </span><span class="se">\'</span><span class="s1">{"module":"log_level",...}</span><span class="se">\'</span><span class="s1">. '</span> |
| <span class="s1">'For example, by specifying the value </span><span class="se">\'</span><span class="s1">{"a.b.c":"DEBUG"}</span><span class="se">\'</span><span class="s1">, '</span> |
| <span class="s1">'the logger underneath the module "a.b.c" will be configured to '</span> |
| <span class="s1">'output logs at the DEBUG level. Similarly, by specifying the '</span> |
| <span class="s1">'value </span><span class="se">\'</span><span class="s1">{"a.b.c":"WARNING"}</span><span class="se">\'</span><span class="s1"> all loggers underneath the "a.b.c" '</span> |
| <span class="s1">'module will be configured to output logs at the WARNING level. '</span> |
| <span class="s1">'Also, note that when multiple overrides are specified, the exact '</span> |
| <span class="s1">'name followed by the closest parent takes precedence.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--use_public_ips'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Whether to assign public IP addresses to the worker VMs.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--no_use_public_ips'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'use_public_ips'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_false'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Whether to assign only private IP addresses to the worker VMs.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--min_cpu_platform'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'min_cpu_platform'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'GCE minimum CPU platform. Default is determined by GCP.'</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="WorkerOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.WorkerOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span> |
| <span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_sdk_container_image_options</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> |
| |
| <span class="k">if</span> <span class="n">validator</span><span class="o">.</span><span class="n">is_service_runner</span><span class="p">():</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_num_workers</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_worker_region_zone</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">errors</span></div></div> |
| |
| |
| <div class="viewcode-block" id="DebugOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions">[docs]</a><span class="k">class</span> <span class="nc">DebugOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--dataflow_job_file'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Debug file to write the workflow specification.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--experiment'</span><span class="p">,</span> |
| <span class="s1">'--experiments'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'experiments'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Runners may provide a number of experimental features that can be '</span> |
| <span class="s1">'enabled with this flag. Please sync with the owners of the runner '</span> |
| <span class="s1">'before enabling any experiments.'</span><span class="p">))</span> |
| |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--number_of_worker_harness_threads'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Number of threads per worker to use on the runner. If left '</span> |
| <span class="s1">'unspecified, the runner will compute an appropriate number of '</span> |
| <span class="s1">'threads to use. Currently only enabled for DataflowRunner when '</span> |
| <span class="s1">'experiment </span><span class="se">\'</span><span class="s1">use_runner_v2</span><span class="se">\'</span><span class="s1"> is enabled.'</span><span class="p">))</span> |
| |
| <div class="viewcode-block" id="DebugOptions.add_experiment"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions.add_experiment">[docs]</a> <span class="k">def</span> <span class="nf">add_experiment</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">experiment</span><span class="p">):</span> |
| <span class="c1"># pylint: disable=access-member-before-definition</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="n">experiment</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">experiment</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="DebugOptions.lookup_experiment"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions.lookup_experiment">[docs]</a> <span class="k">def</span> <span class="nf">lookup_experiment</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">default</span> |
| <span class="k">elif</span> <span class="n">key</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">True</span> |
| <span class="k">for</span> <span class="n">experiment</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">experiment</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="n">key</span> <span class="o">+</span> <span class="s1">'='</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">experiment</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'='</span><span class="p">,</span> <span class="mi">1</span><span class="p">)[</span><span class="mi">1</span><span class="p">]</span> |
| <span class="k">return</span> <span class="n">default</span></div> |
| |
| <div class="viewcode-block" id="DebugOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span> |
| <span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span> |
| <span class="n">validator</span><span class="o">.</span><span class="n">validate_repeatable_argument_passed_as_list</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="s1">'experiments'</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">errors</span></div></div> |
| |
| |
| <div class="viewcode-block" id="ProfilingOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.ProfilingOptions">[docs]</a><span class="k">class</span> <span class="nc">ProfilingOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--profile_cpu'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Enable work item CPU profiling.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--profile_memory'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Enable work item heap profiling.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--profile_location'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'path for saving profiler data.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--profile_sample_rate'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">float</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="mf">1.0</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'A number between 0 and 1 indicating the ratio '</span> |
| <span class="s1">'of bundles that should be profiled.'</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="SetupOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.SetupOptions">[docs]</a><span class="k">class</span> <span class="nc">SetupOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="c1"># Options for installing dependencies in the worker.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--requirements_file'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Path to a requirements file containing package dependencies. '</span> |
| <span class="s1">'Typically it is produced by a pip freeze command. More details: '</span> |
| <span class="s1">'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '</span> |
| <span class="s1">'If used, all the packages specified will be downloaded, '</span> |
| <span class="s1">'cached (use --requirements_cache to change default location), '</span> |
| <span class="s1">'and then staged so that they can be automatically installed in '</span> |
| <span class="s1">'workers during startup. The cache is refreshed as needed '</span> |
| <span class="s1">'avoiding extra downloads for existing packages. Typically the '</span> |
| <span class="s1">'file is named requirements.txt.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--requirements_cache'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Path to a folder to cache the packages specified in '</span> |
| <span class="s1">'the requirements file using the --requirements_file option.'</span> |
| <span class="s1">'If you want to skip populating requirements cache, please '</span> |
| <span class="s1">'specify --requirements_cache="skip".'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--requirements_cache_only_sources'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Enable this flag to populate requirements cache only '</span> |
| <span class="s1">'with Source distributions(sdists) of the dependencies '</span> |
| <span class="s1">'mentioned in the --requirements_file'</span> |
| <span class="s1">'Note: (BEAM-4032): This flag may significantly slow down '</span> |
| <span class="s1">'the pipeline submission. It is added to preserve the requirements'</span> |
| <span class="s1">' cache behavior prior to 2.37.0 and will likely be removed in '</span> |
| <span class="s1">'future releases.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--setup_file'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Path to a setup Python file containing package dependencies. If '</span> |
| <span class="s1">'specified, the file</span><span class="se">\'</span><span class="s1">s containing folder is assumed to have the '</span> |
| <span class="s1">'structure required for a setuptools setup package. The file must '</span> |
| <span class="s1">'be named setup.py. More details: '</span> |
| <span class="s1">'https://pythonhosted.org/an_example_pypi_project/setuptools.html '</span> |
| <span class="s1">'During job submission a source distribution will be built and '</span> |
| <span class="s1">'the worker will install the resulting package before running any '</span> |
| <span class="s1">'custom code.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--beam_plugin'</span><span class="p">,</span> |
| <span class="s1">'--beam_plugins'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'beam_plugins'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Bootstrap the python process before executing any code by '</span> |
| <span class="s1">'importing all the plugins used in the pipeline. Please pass a '</span> |
| <span class="s1">'comma separatedlist of import paths to be included. This is '</span> |
| <span class="s1">'currently an experimental flag and provides no stability. '</span> |
| <span class="s1">'Multiple --beam_plugin options can be specified if more than '</span> |
| <span class="s1">'one plugin is needed.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--pickle_library'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">'default'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Chooses which pickle library to use. Options are dill, '</span> |
| <span class="s1">'cloudpickle or default.'</span><span class="p">),</span> |
| <span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">'cloudpickle'</span><span class="p">,</span> <span class="s1">'default'</span><span class="p">,</span> <span class="s1">'dill'</span><span class="p">])</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--save_main_session'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Save the main session state so that pickled functions and classes '</span> |
| <span class="s1">'defined in __main__ (e.g. interactive session) can be unpickled. '</span> |
| <span class="s1">'Some workflows do not need the session state if for instance all '</span> |
| <span class="s1">'their functions/classes are defined in proper modules '</span> |
| <span class="s1">'(not __main__) and the modules are importable in the worker. '</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--sdk_location'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">'default'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Override the default location from where the Beam SDK is '</span> |
| <span class="s1">'downloaded. It can be a URL, a GCS path, or a local path to an '</span> |
| <span class="s1">'SDK tarball. Workflow submissions will download or copy an SDK '</span> |
| <span class="s1">'tarball from here. If set to the string "default", a standard '</span> |
| <span class="s1">'SDK location is used. If empty, no SDK is copied.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--extra_package'</span><span class="p">,</span> |
| <span class="s1">'--extra_packages'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'extra_packages'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Local path to a Python package file. The file is expected to be '</span> |
| <span class="s1">'(1) a package tarball (".tar"), (2) a compressed package tarball '</span> |
| <span class="s1">'(".tar.gz"), (3) a Wheel file (".whl") or (4) a compressed '</span> |
| <span class="s1">'package zip file (".zip") which can be installed using the '</span> |
| <span class="s1">'"pip install" command of the standard pip package. Multiple '</span> |
| <span class="s1">'--extra_package options can be specified if more than one '</span> |
| <span class="s1">'package is needed. During job submission, the files will be '</span> |
| <span class="s1">'staged in the staging area (--staging_location option) and the '</span> |
| <span class="s1">'workers will install them in same order they were specified on '</span> |
| <span class="s1">'the command line.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--prebuild_sdk_container_engine'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Prebuild sdk worker container image before job submission. If '</span> |
| <span class="s1">'enabled, SDK invokes the boot sequence in SDK worker '</span> |
| <span class="s1">'containers to install all pipeline dependencies in the '</span> |
| <span class="s1">'container, and uses the prebuilt image in the pipeline '</span> |
| <span class="s1">'environment. This may speed up pipeline execution. To enable, '</span> |
| <span class="s1">'select the Docker build engine: local_docker using '</span> |
| <span class="s1">'locally-installed Docker or cloud_build for using Google Cloud '</span> |
| <span class="s1">'Build (requires a GCP project with Cloud Build API enabled). You '</span> |
| <span class="s1">'can also subclass SdkContainerImageBuilder and use that to build '</span> |
| <span class="s1">'in other environments.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--prebuild_sdk_container_base_image'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">'Deprecated. Use --sdk_container_image instead.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--cloud_build_machine_type'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'If specified, use the machine type explicitly when prebuilding'</span> |
| <span class="s1">'SDK container image on Google Cloud Build.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--docker_registry_push_url'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Docker registry url to use for tagging and pushing the prebuilt '</span> |
| <span class="s1">'sdk worker container image.'</span><span class="p">))</span> |
| |
| <div class="viewcode-block" id="SetupOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.SetupOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span> |
| <span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_container_prebuilding_options</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">errors</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">PortableOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Portable options are common options expected to be understood by most of</span> |
| <span class="sd"> the portable runners. Should generally be kept in sync with</span> |
| <span class="sd"> PortablePipelineOptions.java.</span> |
| <span class="sd"> """</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--job_endpoint'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Job service endpoint to use. Should be in the form of host '</span> |
| <span class="s1">'and port, e.g. localhost:8099.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--artifact_endpoint'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Artifact staging endpoint to use. Should be in the form of host '</span> |
| <span class="s1">'and port, e.g. localhost:8098. If none is specified, the '</span> |
| <span class="s1">'artifact endpoint sent from the job server is used.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--job_server_timeout'</span><span class="p">,</span> |
| <span class="s1">'--job-server-timeout'</span><span class="p">,</span> <span class="c1"># For backwards compatibility.</span> |
| <span class="n">default</span><span class="o">=</span><span class="mi">60</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Job service request timeout in seconds. The timeout '</span> |
| <span class="s1">'determines the max time the driver program will wait to '</span> |
| <span class="s1">'get a response from the job server. NOTE: the timeout does not '</span> |
| <span class="s1">'apply to the actual pipeline run time. The driver program can '</span> |
| <span class="s1">'still wait for job completion indefinitely.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--environment_type'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Set the default environment type for running '</span> |
| <span class="s1">'user code. DOCKER (default) runs user code in a container. '</span> |
| <span class="s1">'PROCESS runs user code in processes that are automatically '</span> |
| <span class="s1">'started on each worker node. LOOPBACK runs user code on the '</span> |
| <span class="s1">'same process that originally submitted the job.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--environment_config'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Set environment configuration for running the user code.</span><span class="se">\n</span><span class="s1"> For '</span> |
| <span class="s1">'DOCKER: Url for the docker image.</span><span class="se">\n</span><span class="s1"> For PROCESS: json of the '</span> |
| <span class="s1">'form {"os": "<OS>", "arch": "<ARCHITECTURE>", "command": '</span> |
| <span class="s1">'"<process to execute>", "env":{"<Environment variables 1>": '</span> |
| <span class="s1">'"<ENV_VAL>"} }. All fields in the json are optional except '</span> |
| <span class="s1">'command.</span><span class="se">\n\n</span><span class="s1">Prefer using --environment_options instead.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--environment_option'</span><span class="p">,</span> |
| <span class="s1">'--environment_options'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'environment_options'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Environment configuration for running the user code. '</span> |
| <span class="s1">'Recognized options depend on --environment_type.</span><span class="se">\n</span><span class="s1"> '</span> |
| <span class="s1">'For DOCKER: docker_container_image (optional)</span><span class="se">\n</span><span class="s1"> '</span> |
| <span class="s1">'For PROCESS: process_command (required), process_variables '</span> |
| <span class="s1">'(optional, comma-separated)</span><span class="se">\n</span><span class="s1"> '</span> |
| <span class="s1">'For EXTERNAL: external_service_address (required)'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--sdk_worker_parallelism'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Sets the number of sdk worker processes that will run on each '</span> |
| <span class="s1">'worker node. Default is 1. If 0, a value will be chosen by the '</span> |
| <span class="s1">'runner.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--environment_cache_millis'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Duration in milliseconds for environment cache within a job. '</span> |
| <span class="s1">'0 means no caching.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--output_executable_path'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Create an executable jar at this path rather than running '</span> |
| <span class="s1">'the pipeline.'</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">validator</span><span class="o">.</span><span class="n">validate_environment_options</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">add_environment_option</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">option</span><span class="p">):</span> |
| <span class="c1"># pylint: disable=access-member-before-definition</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="n">option</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">option</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">lookup_environment_option</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">default</span> |
| <span class="k">elif</span> <span class="n">key</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">True</span> |
| <span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">option</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="n">key</span> <span class="o">+</span> <span class="s1">'='</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">option</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'='</span><span class="p">,</span> <span class="mi">1</span><span class="p">)[</span><span class="mi">1</span><span class="p">]</span> |
| <span class="k">return</span> <span class="n">default</span> |
| |
| |
| <span class="k">class</span> <span class="nc">JobServerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Options for starting a Beam job server. Roughly corresponds to</span> |
| <span class="sd"> JobServerDriver.ServerConfiguration in Java.</span> |
| <span class="sd"> """</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--artifacts_dir'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The location to store staged artifact files. '</span> |
| <span class="s1">'Any Beam-supported file system is allowed. '</span> |
| <span class="s1">'If unset, the local temp dir will be used.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--job_port'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Port to use for the job service. 0 to use a '</span> |
| <span class="s1">'dynamic port.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--artifact_port'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Port to use for artifact staging. 0 to use a '</span> |
| <span class="s1">'dynamic port.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--expansion_port'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Port to use for artifact staging. 0 to use a '</span> |
| <span class="s1">'dynamic port.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--job_server_java_launcher'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">'java'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The Java Application Launcher executable file to use for '</span> |
| <span class="s1">'starting a Java job server. If unset, `java` from the '</span> |
| <span class="s1">'environment</span><span class="se">\'</span><span class="s1">s $PATH is used.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--job_server_jvm_properties'</span><span class="p">,</span> |
| <span class="s1">'--job_server_jvm_property'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'job_server_jvm_properties'</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'append'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="p">[],</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'JVM properties to pass to a Java job server.'</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span> <span class="nc">FlinkRunnerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| |
| <span class="c1"># These should stay in sync with gradle.properties.</span> |
| <span class="n">PUBLISHED_FLINK_VERSIONS</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'1.12'</span><span class="p">,</span> <span class="s1">'1.13'</span><span class="p">,</span> <span class="s1">'1.14'</span><span class="p">,</span> <span class="s1">'1.15'</span><span class="p">,</span> <span class="s1">'1.16'</span><span class="p">]</span> |
| |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--flink_master'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">'[auto]'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Flink master address (http://host:port)'</span> |
| <span class="s1">' Use "[local]" to start a local cluster'</span> |
| <span class="s1">' for the execution. Use "[auto]" if you'</span> |
| <span class="s1">' plan to either execute locally or let the'</span> |
| <span class="s1">' Flink job server infer the cluster address.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--flink_version'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">PUBLISHED_FLINK_VERSIONS</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">],</span> |
| <span class="n">choices</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">PUBLISHED_FLINK_VERSIONS</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Flink version to use.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--flink_job_server_jar'</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="s1">'Path or URL to a flink jobserver jar.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--flink_submit_uber_jar'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Create and upload an uberjar to the flink master'</span> |
| <span class="s1">' directly, rather than starting up a job server.'</span> |
| <span class="s1">' Only applies when flink_master is set to a'</span> |
| <span class="s1">' cluster address. Requires Python 3.6+.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--parallelism'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=-</span><span class="mi">1</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The degree of parallelism to be used when distributing'</span> |
| <span class="s1">' operations onto workers. If the parallelism is not set, the'</span> |
| <span class="s1">' configured Flink default is used, or 1 if none can be found.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--max_parallelism'</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=-</span><span class="mi">1</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The pipeline wide maximum degree of parallelism to be used. The'</span> |
| <span class="s1">' maximum parallelism specifies the upper limit for dynamic scaling'</span> |
| <span class="s1">' and the number of key groups used for partitioned state.'</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span> <span class="nc">SparkRunnerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--spark_master_url'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">'local[4]'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Spark master URL (spark://HOST:PORT). '</span> |
| <span class="s1">'Use "local" (single-threaded) or "local[*]" '</span> |
| <span class="s1">'(multi-threaded) to start a local cluster for '</span> |
| <span class="s1">'the execution.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--spark_job_server_jar'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Path or URL to a Beam Spark job server jar. '</span> |
| <span class="s1">'Overrides --spark_version.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--spark_submit_uber_jar'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Create and upload an uber jar to the Spark REST'</span> |
| <span class="s1">' endpoint, rather than starting up a job server.'</span> |
| <span class="s1">' Requires Python 3.6+.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--spark_rest_url'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'URL for the Spark REST endpoint. '</span> |
| <span class="s1">'Only required when using spark_submit_uber_jar. '</span> |
| <span class="s1">'For example, http://hostname:6066'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--spark_version'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="s1">'3'</span><span class="p">,</span> |
| <span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">'3'</span><span class="p">],</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Spark major version to use.'</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="TestOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TestOptions">[docs]</a><span class="k">class</span> <span class="nc">TestOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="c1"># Options for e2e test pipeline.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--on_success_matcher'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Verify state/output of e2e test pipeline. This is pickled '</span> |
| <span class="s1">'version of the matcher which should extends '</span> |
| <span class="s1">'hamcrest.core.base_matcher.BaseMatcher.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--dry_run'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Used in unit testing runners without submitting the '</span> |
| <span class="s1">'actual job.'</span><span class="p">))</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--wait_until_finish_duration'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The time to wait (in milliseconds) for test pipeline to finish. '</span> |
| <span class="s1">'If it is set to None, it will wait indefinitely until the job '</span> |
| <span class="s1">'is finished.'</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="TestOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TestOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span> |
| <span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">TestOptions</span><span class="p">)</span><span class="o">.</span><span class="n">on_success_matcher</span><span class="p">:</span> |
| <span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_test_matcher</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'on_success_matcher'</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">errors</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">TestDataflowOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="c1"># This option is passed to Dataflow Runner's Pub/Sub client. The camelCase</span> |
| <span class="c1"># style in 'dest' matches the runner's.</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--pubsub_root_url'</span><span class="p">,</span> |
| <span class="n">dest</span><span class="o">=</span><span class="s1">'pubsubRootUrl'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Root URL for use with the Google Cloud Pub/Sub API.'</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| |
| <span class="c1"># TODO(silviuc): Add --files_to_stage option.</span> |
| <span class="c1"># This could potentially replace the --requirements_file and --setup_file.</span> |
| |
| <span class="c1"># TODO(silviuc): Non-standard options. Keep them? If yes, add help too!</span> |
| <span class="c1"># Remote execution must check that this option is not None.</span> |
| |
| |
| <span class="k">class</span> <span class="nc">OptionsContext</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Set default pipeline options for pipelines created in this block.</span> |
| |
| <span class="sd"> This is particularly useful for pipelines implicitly created with the</span> |
| |
| <span class="sd"> [python list] | PTransform</span> |
| |
| <span class="sd"> construct.</span> |
| |
| <span class="sd"> Can also be used as a decorator.</span> |
| <span class="sd"> """</span> |
| <span class="n">overrides</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: List[Dict[str, Any]]</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">options</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span> |
| |
| <span class="k">def</span> <span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">overrides</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">exn_info</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">overrides</span><span class="o">.</span><span class="n">pop</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">wrapper</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="k">with</span> <span class="bp">self</span><span class="p">:</span> |
| <span class="n">f</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">wrapper</span> |
| |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">augment_options</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">override</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="n">overrides</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">override</span><span class="o">.</span><span class="n">items</span><span class="p">():</span> |
| <span class="nb">setattr</span><span class="p">(</span><span class="n">options</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">options</span> |
| |
| |
| <div class="viewcode-block" id="S3Options"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.S3Options">[docs]</a><span class="k">class</span> <span class="nc">S3Options</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span> |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span> |
| <span class="c1"># These options are passed to the S3 IO Client</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--s3_access_key_id'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The secret key to use when creating the s3 client.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--s3_secret_access_key'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The secret key to use when creating the s3 client.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--s3_session_token'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The session token to use when creating the s3 client.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--s3_endpoint_url'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The complete URL to use for the constructed s3 client.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--s3_region_name'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The name of the region associated with the s3 client.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--s3_api_version'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'The API version to use with the s3 client.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--s3_verify'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Whether or not to verify SSL certificates with the s3 client.'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span> |
| <span class="s1">'--s3_disable_ssl'</span><span class="p">,</span> |
| <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">action</span><span class="o">=</span><span class="s1">'store_true'</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="p">(</span> |
| <span class="s1">'Whether or not to use SSL with the s3 client. '</span> |
| <span class="s1">'By default, SSL is used.'</span><span class="p">))</span></div> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |