blob: c75c2945397eb286f55ab7282936d2f9d4e5c431 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.options.pipeline_options &mdash; Apache Beam 2.47.0 documentation</title>
<script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.47.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.options.pipeline_options</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.options.pipeline_options</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Pipeline options obtained from command line parsing.&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">import</span> <span class="nn">argparse</span>
<span class="kn">import</span> <span class="nn">json</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Callable</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">List</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Optional</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Type</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">TypeVar</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="kn">import</span> <span class="n">RuntimeValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="kn">import</span> <span class="n">StaticValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="kn">import</span> <span class="n">ValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="kn">import</span> <span class="n">HasDisplayData</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;PipelineOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;StandardOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;TypeOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;DirectOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;GoogleCloudOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;AzureOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;HadoopFileSystemOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;WorkerOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;DebugOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;ProfilingOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;SetupOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;TestOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;S3Options&#39;</span>
<span class="p">]</span>
<span class="n">PipelineOptionsT</span> <span class="o">=</span> <span class="n">TypeVar</span><span class="p">(</span><span class="s1">&#39;PipelineOptionsT&#39;</span><span class="p">,</span> <span class="n">bound</span><span class="o">=</span><span class="s1">&#39;PipelineOptions&#39;</span><span class="p">)</span>
<span class="n">_LOGGER</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
<span class="c1"># Map defined with option names to flag names for boolean options</span>
<span class="c1"># that have a destination(dest) in parser.add_argument() different</span>
<span class="c1"># from the flag name and whose default value is `None`.</span>
<span class="n">_FLAG_THAT_SETS_FALSE_VALUE</span> <span class="o">=</span> <span class="p">{</span><span class="s1">&#39;use_public_ips&#39;</span><span class="p">:</span> <span class="s1">&#39;no_use_public_ips&#39;</span><span class="p">}</span>
<span class="k">def</span> <span class="nf">_static_value_provider_of</span><span class="p">(</span><span class="n">value_type</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;&quot;Helper function to plug a ValueProvider into argparse.</span>
<span class="sd"> Args:</span>
<span class="sd"> value_type: the type of the value. Since the type param of argparse&#39;s</span>
<span class="sd"> add_argument will always be ValueProvider, we need to</span>
<span class="sd"> preserve the type of the actual value.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A partially constructed StaticValueProvider in the form of a function.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">_f</span><span class="p">(</span><span class="n">value</span><span class="p">):</span>
<span class="n">_f</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="n">value_type</span><span class="o">.</span><span class="vm">__name__</span>
<span class="k">return</span> <span class="n">StaticValueProvider</span><span class="p">(</span><span class="n">value_type</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_f</span>
<span class="k">class</span> <span class="nc">_BeamArgumentParser</span><span class="p">(</span><span class="n">argparse</span><span class="o">.</span><span class="n">ArgumentParser</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;An ArgumentParser that supports ValueProvider options.</span>
<span class="sd"> Example Usage::</span>
<span class="sd"> class TemplateUserOptions(PipelineOptions):</span>
<span class="sd"> @classmethod</span>
<span class="sd"> def _add_argparse_args(cls, parser):</span>
<span class="sd"> parser.add_value_provider_argument(&#39;--vp_arg1&#39;, default=&#39;start&#39;)</span>
<span class="sd"> parser.add_value_provider_argument(&#39;--vp_arg2&#39;)</span>
<span class="sd"> parser.add_argument(&#39;--non_vp_arg&#39;)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">add_value_provider_argument</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;ValueProvider arguments can be either of type keyword or positional.</span>
<span class="sd"> At runtime, even positional arguments will need to be supplied in the</span>
<span class="sd"> key/value form.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Extract the option name from positional argument [&#39;pos_arg&#39;]</span>
<span class="k">assert</span> <span class="n">args</span> <span class="o">!=</span> <span class="p">()</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> <span class="o">&gt;=</span> <span class="mi">1</span>
<span class="k">if</span> <span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="s1">&#39;-&#39;</span><span class="p">:</span>
<span class="n">option_name</span> <span class="o">=</span> <span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;nargs&#39;</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> <span class="c1"># make them optionally templated</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;nargs&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="s1">&#39;?&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]</span>
<span class="n">option_name</span> <span class="o">=</span> <span class="p">[</span><span class="n">i</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;--&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="n">args</span> <span class="k">if</span> <span class="n">i</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;--&#39;</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span>
<span class="c1"># reassign the type to make room for using</span>
<span class="c1"># StaticValueProvider as the type for add_argument</span>
<span class="n">value_type</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;type&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">str</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">_static_value_provider_of</span><span class="p">(</span><span class="n">value_type</span><span class="p">)</span>
<span class="c1"># reassign default to default_value to make room for using</span>
<span class="c1"># RuntimeValueProvider as the default for add_argument</span>
<span class="n">default_value</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;default&#39;</span><span class="p">)</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;default&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">RuntimeValueProvider</span><span class="p">(</span>
<span class="n">option_name</span><span class="o">=</span><span class="n">option_name</span><span class="p">,</span>
<span class="n">value_type</span><span class="o">=</span><span class="n">value_type</span><span class="p">,</span>
<span class="n">default_value</span><span class="o">=</span><span class="n">default_value</span><span class="p">)</span>
<span class="c1"># have add_argument do most of the work</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="c1"># The argparse package by default tries to autocomplete option names. This</span>
<span class="c1"># results in an &quot;ambiguous option&quot; error from argparse when an unknown option</span>
<span class="c1"># matching multiple known ones are used. This suppresses that behavior.</span>
<span class="k">def</span> <span class="nf">error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">message</span><span class="p">):</span>
<span class="k">if</span> <span class="n">message</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;ambiguous option: &#39;</span><span class="p">):</span>
<span class="k">return</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">_DictUnionAction</span><span class="p">(</span><span class="n">argparse</span><span class="o">.</span><span class="n">Action</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> argparse Action take union of json loads values. If a key is specified in more</span>
<span class="sd"> than one of the values, the last value takes precedence.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">parser</span><span class="p">,</span> <span class="n">namespace</span><span class="p">,</span> <span class="n">values</span><span class="p">,</span> <span class="n">option_string</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dest</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">dest</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">dest</span><span class="p">,</span> <span class="p">{})</span>
<span class="nb">getattr</span><span class="p">(</span><span class="n">namespace</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">dest</span><span class="p">)</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">values</span><span class="p">)</span>
<div class="viewcode-block" id="PipelineOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions">[docs]</a><span class="k">class</span> <span class="nc">PipelineOptions</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;This class and subclasses are used as containers for command line options.</span>
<span class="sd"> These classes are wrappers over the standard argparse Python module</span>
<span class="sd"> (see https://docs.python.org/3/library/argparse.html). To define one option</span>
<span class="sd"> or a group of options, create a subclass from PipelineOptions.</span>
<span class="sd"> Example Usage::</span>
<span class="sd"> class XyzOptions(PipelineOptions):</span>
<span class="sd"> @classmethod</span>
<span class="sd"> def _add_argparse_args(cls, parser):</span>
<span class="sd"> parser.add_argument(&#39;--abc&#39;, default=&#39;start&#39;)</span>
<span class="sd"> parser.add_argument(&#39;--xyz&#39;, default=&#39;end&#39;)</span>
<span class="sd"> The arguments for the add_argument() method are exactly the ones</span>
<span class="sd"> described in the argparse public documentation.</span>
<span class="sd"> Pipeline objects require an options object during initialization.</span>
<span class="sd"> This is obtained simply by initializing an options class as defined above.</span>
<span class="sd"> Example Usage::</span>
<span class="sd"> p = Pipeline(options=XyzOptions())</span>
<span class="sd"> if p.options.xyz == &#39;end&#39;:</span>
<span class="sd"> raise ValueError(&#39;Option xyz has an invalid value.&#39;)</span>
<span class="sd"> Instances of PipelineOptions or any of its subclass have access to values</span>
<span class="sd"> defined by other PipelineOption subclasses (see get_all_options()), and</span>
<span class="sd"> can be converted to an instance of another PipelineOptions subclass</span>
<span class="sd"> (see view_as()). All views share the underlying data structure that stores</span>
<span class="sd"> option key-value pairs.</span>
<span class="sd"> By default the options classes will use command line arguments to initialize</span>
<span class="sd"> the options.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">flags</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="c1"># type: (Optional[List[str]], **Any) -&gt; None</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Initialize an options class.</span>
<span class="sd"> The initializer will traverse all subclasses, add all their argparse</span>
<span class="sd"> arguments and then parse the command line specified by flags or by default</span>
<span class="sd"> the one obtained from sys.argv.</span>
<span class="sd"> The subclasses of PipelineOptions do not need to redefine __init__.</span>
<span class="sd"> Args:</span>
<span class="sd"> flags: An iterable of command line arguments to be used. If not specified</span>
<span class="sd"> then sys.argv will be used as input for parsing arguments.</span>
<span class="sd"> **kwargs: Add overrides for arguments passed in flags. For overrides</span>
<span class="sd"> of arguments, please pass the `option names` instead of</span>
<span class="sd"> flag names.</span>
<span class="sd"> Option names: These are defined as dest in the</span>
<span class="sd"> parser.add_argument() for each flag. Passing flags</span>
<span class="sd"> like {no_use_public_ips: True}, for which the dest is</span>
<span class="sd"> defined to a different flag name in the parser,</span>
<span class="sd"> would be discarded. Instead, pass the dest of</span>
<span class="sd"> the flag (dest of no_use_public_ips is use_public_ips).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Initializing logging configuration in case the user did not set it up.</span>
<span class="n">logging</span><span class="o">.</span><span class="n">basicConfig</span><span class="p">()</span>
<span class="c1"># self._flags stores a list of not yet parsed arguments, typically,</span>
<span class="c1"># command-line flags. This list is shared across different views.</span>
<span class="c1"># See: view_as().</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_flags</span> <span class="o">=</span> <span class="n">flags</span>
<span class="c1"># Build parser that will parse options recognized by the [sub]class of</span>
<span class="c1"># PipelineOptions whose object is being instantiated.</span>
<span class="n">parser</span> <span class="o">=</span> <span class="n">_BeamArgumentParser</span><span class="p">()</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">mro</span><span class="p">():</span>
<span class="k">if</span> <span class="bp">cls</span> <span class="o">==</span> <span class="n">PipelineOptions</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">elif</span> <span class="s1">&#39;_add_argparse_args&#39;</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">:</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span> <span class="c1"># type: ignore</span>
<span class="c1"># The _visible_options attribute will contain options that were recognized</span>
<span class="c1"># by the parser.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="n">flags</span><span class="p">)</span>
<span class="c1"># self._all_options is initialized with overrides to flag values,</span>
<span class="c1"># provided in kwargs, and will store key-value pairs for options recognized</span>
<span class="c1"># by current PipelineOptions [sub]class and its views that may be created.</span>
<span class="c1"># See: view_as().</span>
<span class="c1"># This dictionary is shared across different views, and is lazily updated</span>
<span class="c1"># as each new views are created.</span>
<span class="c1"># Users access this dictionary store via __getattr__ / __setattr__ methods.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="c1"># Initialize values of keys defined by this class.</span>
<span class="k">for</span> <span class="n">option_name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="c1"># Note that options specified in kwargs will not be overwritten.</span>
<span class="k">if</span> <span class="n">option_name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">option_name</span><span class="p">]</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span> <span class="n">option_name</span><span class="p">)</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># type: (_BeamArgumentParser) -&gt; None</span>
<span class="c1"># Override this in subclasses to provide options.</span>
<span class="k">pass</span>
<div class="viewcode-block" id="PipelineOptions.from_dictionary"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.from_dictionary">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">from_dictionary</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a PipelineOptions from a dictionary of arguments.</span>
<span class="sd"> Args:</span>
<span class="sd"> options: Dictionary of argument value pairs.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A PipelineOptions object representing the given arguments.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">flags</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">options</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="c1"># Note: If a boolean flag is True in the dictionary,</span>
<span class="c1"># implicitly the method assumes the boolean flag is</span>
<span class="c1"># specified as a command line argument. If the</span>
<span class="c1"># boolean flag is False, this method simply discards them.</span>
<span class="c1"># Eg: {no_auth: True} is similar to python your_file.py --no_auth</span>
<span class="c1"># {no_auth: False} is similar to python your_file.py.</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">bool</span><span class="p">):</span>
<span class="k">if</span> <span class="n">v</span><span class="p">:</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">k</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">_FLAG_THAT_SETS_FALSE_VALUE</span><span class="p">:</span>
<span class="c1"># Capture overriding flags, which have a different dest</span>
<span class="c1"># from the flag name defined in the parser.add_argument</span>
<span class="c1"># Eg: no_use_public_ips, which has the dest=use_public_ips</span>
<span class="c1"># different from flag name</span>
<span class="n">flag_that_disables_the_option</span> <span class="o">=</span> <span class="p">(</span><span class="n">_FLAG_THAT_SETS_FALSE_VALUE</span><span class="p">[</span><span class="n">k</span><span class="p">])</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">flag_that_disables_the_option</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="n">v</span><span class="p">:</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">i</span><span class="p">))</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">v</span><span class="p">)))</span>
<span class="k">elif</span> <span class="n">v</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># Don&#39;t process None type args here, they will be treated</span>
<span class="c1"># as strings when parsed by BeamArgumentParser..</span>
<span class="n">logging</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s1">&#39;Not setting flag with value None: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">k</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">cls</span><span class="p">(</span><span class="n">flags</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineOptions.get_all_options"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.get_all_options">[docs]</a> <span class="k">def</span> <span class="nf">get_all_options</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">drop_default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">add_extra_args_fn</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable[[_BeamArgumentParser], None]]</span>
<span class="n">retain_unknown_options</span><span class="o">=</span><span class="kc">False</span>
<span class="p">):</span>
<span class="c1"># type: (...) -&gt; Dict[str, Any]</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a dictionary of all defined arguments.</span>
<span class="sd"> Returns a dictionary of all defined arguments (arguments that are defined in</span>
<span class="sd"> any subclass of PipelineOptions) into a dictionary.</span>
<span class="sd"> Args:</span>
<span class="sd"> drop_default: If set to true, options that are equal to their default</span>
<span class="sd"> values, are not returned as part of the result dictionary.</span>
<span class="sd"> add_extra_args_fn: Callback to populate additional arguments, can be used</span>
<span class="sd"> by runner to supply otherwise unknown args.</span>
<span class="sd"> retain_unknown_options: If set to true, options not recognized by any</span>
<span class="sd"> known pipeline options class will still be included in the result. If</span>
<span class="sd"> set to false, they will be discarded.</span>
<span class="sd"> Returns:</span>
<span class="sd"> Dictionary of all args and values.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># TODO(https://github.com/apache/beam/issues/18197): PipelineOption</span>
<span class="c1"># sub-classes in the main session might be repeated. Pick last unique</span>
<span class="c1"># instance of each subclass to avoid conflicts.</span>
<span class="n">subset</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">parser</span> <span class="o">=</span> <span class="n">_BeamArgumentParser</span><span class="p">()</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">PipelineOptions</span><span class="o">.</span><span class="n">__subclasses__</span><span class="p">():</span>
<span class="n">subset</span><span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="bp">cls</span><span class="p">)]</span> <span class="o">=</span> <span class="bp">cls</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">subset</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span> <span class="c1"># pylint: disable=protected-access</span>
<span class="k">if</span> <span class="n">add_extra_args_fn</span><span class="p">:</span>
<span class="n">add_extra_args_fn</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span>
<span class="n">known_args</span><span class="p">,</span> <span class="n">unknown_args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span>
<span class="k">if</span> <span class="n">retain_unknown_options</span><span class="p">:</span>
<span class="n">i</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">while</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="nb">len</span><span class="p">(</span><span class="n">unknown_args</span><span class="p">):</span>
<span class="c1"># Treat all unary flags as booleans, and all binary argument values as</span>
<span class="c1"># strings.</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;-&#39;</span><span class="p">):</span>
<span class="n">i</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">continue</span>
<span class="k">if</span> <span class="n">i</span> <span class="o">+</span> <span class="mi">1</span> <span class="o">&gt;=</span> <span class="nb">len</span><span class="p">(</span><span class="n">unknown_args</span><span class="p">)</span> <span class="ow">or</span> <span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span> <span class="o">+</span> <span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;-&#39;</span><span class="p">):</span>
<span class="n">split</span> <span class="o">=</span> <span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;=&#39;</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">split</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">],</span> <span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="n">split</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">)</span>
<span class="n">i</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">elif</span> <span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;--&#39;</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">],</span> <span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">)</span>
<span class="n">i</span> <span class="o">+=</span> <span class="mi">2</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># skip all binary flags used with &#39;-&#39; and not &#39;--&#39;.</span>
<span class="c1"># ex: using -f instead of --f (or --flexrs_goal) will prevent</span>
<span class="c1"># argument validation before job submission and can be incorrectly</span>
<span class="c1"># submitted to job.</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s2">&quot;Discarding flag </span><span class="si">%s</span><span class="s2">, single dash flags are not allowed.&quot;</span><span class="p">,</span>
<span class="n">unknown_args</span><span class="p">[</span><span class="n">i</span><span class="p">])</span>
<span class="n">i</span> <span class="o">+=</span> <span class="mi">2</span>
<span class="k">continue</span>
<span class="n">parsed_args</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">if</span> <span class="n">unknown_args</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">&quot;Discarding unparseable args: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">unknown_args</span><span class="p">)</span>
<span class="n">parsed_args</span> <span class="o">=</span> <span class="n">known_args</span>
<span class="n">result</span> <span class="o">=</span> <span class="nb">vars</span><span class="p">(</span><span class="n">parsed_args</span><span class="p">)</span>
<span class="n">overrides</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="c1"># Apply the overrides if any</span>
<span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">result</span><span class="p">):</span>
<span class="n">overrides</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">if</span> <span class="n">k</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span>
<span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">k</span><span class="p">]</span>
<span class="k">if</span> <span class="p">(</span><span class="n">drop_default</span> <span class="ow">and</span> <span class="n">parser</span><span class="o">.</span><span class="n">get_default</span><span class="p">(</span><span class="n">k</span><span class="p">)</span> <span class="o">==</span> <span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> <span class="ow">and</span>
<span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">parser</span><span class="o">.</span><span class="n">get_default</span><span class="p">(</span><span class="n">k</span><span class="p">),</span> <span class="n">ValueProvider</span><span class="p">)):</span>
<span class="k">del</span> <span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span>
<span class="k">if</span> <span class="n">overrides</span><span class="p">:</span>
<span class="k">if</span> <span class="n">retain_unknown_options</span><span class="p">:</span>
<span class="n">result</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">overrides</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">&quot;Discarding invalid overrides: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">overrides</span><span class="p">)</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="PipelineOptions.display_data"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_all_options</span><span class="p">(</span><span class="n">drop_default</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">retain_unknown_options</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineOptions.view_as"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.view_as">[docs]</a> <span class="k">def</span> <span class="nf">view_as</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="bp">cls</span><span class="p">):</span>
<span class="c1"># type: (Type[PipelineOptionsT]) -&gt; PipelineOptionsT</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Returns a view of current object as provided PipelineOption subclass.</span>
<span class="sd"> Example Usage::</span>
<span class="sd"> options = PipelineOptions([&#39;--runner&#39;, &#39;Direct&#39;, &#39;--streaming&#39;])</span>
<span class="sd"> standard_options = options.view_as(StandardOptions)</span>
<span class="sd"> if standard_options.streaming:</span>
<span class="sd"> # ... start a streaming job ...</span>
<span class="sd"> Note that options objects may have multiple views, and modifications</span>
<span class="sd"> of values in any view-object will apply to current object and other</span>
<span class="sd"> view-objects.</span>
<span class="sd"> Args:</span>
<span class="sd"> cls: PipelineOptions class or any of its subclasses.</span>
<span class="sd"> Returns:</span>
<span class="sd"> An instance of cls that is intitialized using options contained in current</span>
<span class="sd"> object.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">view</span> <span class="o">=</span> <span class="bp">cls</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span>
<span class="k">for</span> <span class="n">option_name</span> <span class="ow">in</span> <span class="n">view</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="c1"># Initialize values of keys defined by a cls.</span>
<span class="c1">#</span>
<span class="c1"># Note that we do initialization only once per key to make sure that</span>
<span class="c1"># values in _all_options dict are not-recreated with each new view.</span>
<span class="c1"># This is important to make sure that values of multi-options keys are</span>
<span class="c1"># backed by the same list across multiple views, and that any overrides of</span>
<span class="c1"># pipeline options already stored in _all_options are preserved.</span>
<span class="k">if</span> <span class="n">option_name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">option_name</span><span class="p">]</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span>
<span class="n">view</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span> <span class="n">option_name</span><span class="p">)</span>
<span class="c1"># Note that views will still store _all_options of the source object.</span>
<span class="n">view</span><span class="o">.</span><span class="n">_all_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span>
<span class="k">return</span> <span class="n">view</span></div>
<span class="k">def</span> <span class="nf">_visible_option_list</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; List[str]</span>
<span class="k">return</span> <span class="nb">sorted</span><span class="p">(</span>
<span class="n">option</span> <span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="nb">dir</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">)</span> <span class="k">if</span> <span class="n">option</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="s1">&#39;_&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__dir__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; List[str]</span>
<span class="k">return</span> <span class="nb">sorted</span><span class="p">(</span>
<span class="nb">dir</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> <span class="o">+</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">)</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">())</span>
<span class="k">def</span> <span class="fm">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">):</span>
<span class="c1"># Special methods which may be accessed before the object is</span>
<span class="c1"># fully constructed (e.g. in unpickling).</span>
<span class="k">if</span> <span class="n">name</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> <span class="o">==</span> <span class="n">name</span><span class="p">[</span><span class="o">-</span><span class="mi">2</span><span class="p">:]</span> <span class="o">==</span> <span class="s1">&#39;__&#39;</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">name</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span>
<span class="s2">&quot;&#39;</span><span class="si">%s</span><span class="s2">&#39; object has no attribute &#39;</span><span class="si">%s</span><span class="s2">&#39;&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span>
<span class="k">def</span> <span class="fm">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;_flags&#39;</span><span class="p">,</span> <span class="s1">&#39;_all_options&#39;</span><span class="p">,</span> <span class="s1">&#39;_visible_options&#39;</span><span class="p">):</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__setattr__</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span>
<span class="s2">&quot;&#39;</span><span class="si">%s</span><span class="s2">&#39; object has no attribute &#39;</span><span class="si">%s</span><span class="s2">&#39;&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span>
<span class="k">def</span> <span class="fm">__str__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="p">(</span>
<span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span>
<span class="s1">&#39;, &#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span>
<span class="s1">&#39;</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">option</span><span class="p">,</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">option</span><span class="p">))</span>
<span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">()))</span></div>
<div class="viewcode-block" id="StandardOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.StandardOptions">[docs]</a><span class="k">class</span> <span class="nc">StandardOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="n">DEFAULT_RUNNER</span> <span class="o">=</span> <span class="s1">&#39;DirectRunner&#39;</span>
<span class="n">ALL_KNOWN_RUNNERS</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;apache_beam.runners.dataflow.dataflow_runner.DataflowRunner&#39;</span><span class="p">,</span>
<span class="s1">&#39;apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner&#39;</span><span class="p">,</span>
<span class="s1">&#39;apache_beam.runners.direct.direct_runner.DirectRunner&#39;</span><span class="p">,</span>
<span class="s1">&#39;apache_beam.runners.direct.direct_runner.SwitchingDirectRunner&#39;</span><span class="p">,</span>
<span class="s1">&#39;apache_beam.runners.interactive.interactive_runner.InteractiveRunner&#39;</span><span class="p">,</span>
<span class="s1">&#39;apache_beam.runners.portability.flink_runner.FlinkRunner&#39;</span><span class="p">,</span>
<span class="s1">&#39;apache_beam.runners.portability.portable_runner.PortableRunner&#39;</span><span class="p">,</span>
<span class="s1">&#39;apache_beam.runners.portability.spark_runner.SparkRunner&#39;</span><span class="p">,</span>
<span class="s1">&#39;apache_beam.runners.test.TestDirectRunner&#39;</span><span class="p">,</span>
<span class="s1">&#39;apache_beam.runners.test.TestDataflowRunner&#39;</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">KNOWN_RUNNER_NAMES</span> <span class="o">=</span> <span class="p">[</span><span class="n">path</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;.&#39;</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span> <span class="k">for</span> <span class="n">path</span> <span class="ow">in</span> <span class="n">ALL_KNOWN_RUNNERS</span><span class="p">]</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--runner&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Pipeline runner used to execute the workflow. Valid values are &#39;</span>
<span class="s1">&#39;one of </span><span class="si">%s</span><span class="s1">, or the fully qualified name of a PipelineRunner &#39;</span>
<span class="s1">&#39;subclass. If unspecified, defaults to </span><span class="si">%s</span><span class="s1">.&#39;</span> <span class="o">%</span>
<span class="p">(</span><span class="s1">&#39;, &#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">KNOWN_RUNNER_NAMES</span><span class="p">),</span> <span class="bp">cls</span><span class="o">.</span><span class="n">DEFAULT_RUNNER</span><span class="p">)))</span>
<span class="c1"># Whether to enable streaming mode.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--streaming&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether to enable streaming mode.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--resource_hint&#39;</span><span class="p">,</span>
<span class="s1">&#39;--resource_hints&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;resource_hints&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="p">[],</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Resource hint to set in the pipeline execution environment.&#39;</span>
<span class="s1">&#39;Hints specified via this option override hints specified &#39;</span>
<span class="s1">&#39;at transform level. Interpretation of hints is defined by &#39;</span>
<span class="s1">&#39;Beam runners.&#39;</span><span class="p">))</span></div>
<span class="k">class</span> <span class="nc">CrossLanguageOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--beam_services&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="p">{},</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;For convienience, Beam provides the ability to automatically &#39;</span>
<span class="s1">&#39;download and start various services (such as expansion services) &#39;</span>
<span class="s1">&#39;used at pipeline construction and execution. These services are &#39;</span>
<span class="s1">&#39;identified by gradle target. This option provides the ability to &#39;</span>
<span class="s1">&#39;use pre-started services or non-default pre-existing artifacts to &#39;</span>
<span class="s1">&#39;start the given service. &#39;</span>
<span class="s1">&#39;Should be a json mapping of gradle build targets to pre-built &#39;</span>
<span class="s1">&#39;artifacts (e.g. jar files) expansion endpoints (e.g. host:port).&#39;</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">additional_option_ptransform_fn</span><span class="p">():</span>
<span class="n">beam</span><span class="o">.</span><span class="n">transforms</span><span class="o">.</span><span class="n">ptransform</span><span class="o">.</span><span class="n">ptransform_fn_typehints_enabled</span> <span class="o">=</span> <span class="kc">True</span>
<span class="c1"># Optional type checks that aren&#39;t enabled by default.</span>
<span class="n">additional_type_checks</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">&#39;ptransform_fn&#39;</span><span class="p">:</span> <span class="n">additional_option_ptransform_fn</span><span class="p">,</span>
<span class="p">}</span> <span class="c1"># type: Dict[str, Callable[[], None]]</span>
<span class="k">def</span> <span class="nf">enable_all_additional_type_checks</span><span class="p">():</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Same as passing --type_check_additional=all.&quot;&quot;&quot;</span>
<span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">additional_type_checks</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="n">f</span><span class="p">()</span>
<div class="viewcode-block" id="TypeOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TypeOptions">[docs]</a><span class="k">class</span> <span class="nc">TypeOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># TODO(laolu): Add a type inferencing option here once implemented.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--type_check_strictness&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;DEFAULT_TO_ANY&#39;</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;ALL_REQUIRED&#39;</span><span class="p">,</span> <span class="s1">&#39;DEFAULT_TO_ANY&#39;</span><span class="p">],</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The level of exhaustive manual type-hint &#39;</span>
<span class="s1">&#39;annotation required&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--type_check_additional&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Comma separated list of additional type checking features to &#39;</span>
<span class="s1">&#39;enable. Options: all, ptransform_fn. For details see:&#39;</span>
<span class="s1">&#39;https://beam.apache.org/documentation/sdks/python-type-safety/&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--no_pipeline_type_check&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;pipeline_type_check&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_false&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Disable type checking at pipeline construction &#39;</span>
<span class="s1">&#39;time&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--runtime_type_check&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable type checking at pipeline execution &#39;</span>
<span class="s1">&#39;time. NOTE: only supported with the &#39;</span>
<span class="s1">&#39;DirectRunner&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--performance_runtime_type_check&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable faster type checking via sampling at pipeline execution &#39;</span>
<span class="s1">&#39;time. NOTE: only supported with portable runners &#39;</span>
<span class="s1">&#39;(including the DirectRunner)&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--allow_non_deterministic_key_coders&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Use non-deterministic coders (such as pickling) for key-grouping &#39;</span>
<span class="s1">&#39;operations such as GropuByKey. This is unsafe, as runners may group &#39;</span>
<span class="s1">&#39;keys based on their encoded bytes, but is available for backwards &#39;</span>
<span class="s1">&#39;compatibility. See BEAM-11719.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--allow_unsafe_triggers&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Allow the use of unsafe triggers. Unsafe triggers have the &#39;</span>
<span class="s1">&#39;potential to cause data loss due to finishing and/or never having &#39;</span>
<span class="s1">&#39;their condition met. Some operations, such as GroupByKey, disallow &#39;</span>
<span class="s1">&#39;this. This exists for cases where such loss is acceptable and for &#39;</span>
<span class="s1">&#39;backwards compatibility. See BEAM-9487.&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="TypeOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TypeOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">beam</span><span class="o">.</span><span class="n">version</span><span class="o">.</span><span class="n">__version__</span> <span class="o">&gt;=</span> <span class="s1">&#39;3&#39;</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="s1">&#39;Update --type_check_additional default to include all &#39;</span>
<span class="s1">&#39;available additional checks at Beam 3.0 release time.&#39;</span><span class="p">)</span>
<span class="n">keys</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">type_check_additional</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;,&#39;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">keys</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">key</span><span class="p">:</span>
<span class="k">continue</span>
<span class="k">elif</span> <span class="n">key</span> <span class="o">==</span> <span class="s1">&#39;all&#39;</span><span class="p">:</span>
<span class="n">enable_all_additional_type_checks</span><span class="p">()</span>
<span class="k">elif</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">additional_type_checks</span><span class="p">:</span>
<span class="n">additional_type_checks</span><span class="p">[</span><span class="n">key</span><span class="p">]()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;Unrecognized --type_check_additional feature: </span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">key</span><span class="p">)</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<div class="viewcode-block" id="DirectOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DirectOptions">[docs]</a><span class="k">class</span> <span class="nc">DirectOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;DirectRunner-specific execution options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--no_direct_runner_use_stacked_bundle&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_false&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;direct_runner_use_stacked_bundle&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;DirectRunner uses stacked WindowedValues within a Bundle for &#39;</span>
<span class="s1">&#39;memory optimization. Set --no_direct_runner_use_stacked_bundle to &#39;</span>
<span class="s1">&#39;avoid it.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--direct_runner_bundle_repeat&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;replay every bundle this many extra times, for profiling&#39;</span>
<span class="s1">&#39;and debugging&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--direct_num_workers&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;number of parallel running workers.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--direct_running_mode&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;in_memory&#39;</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;in_memory&#39;</span><span class="p">,</span> <span class="s1">&#39;multi_threading&#39;</span><span class="p">,</span> <span class="s1">&#39;multi_processing&#39;</span><span class="p">],</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Workers running environment.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--direct_embed_docker_python&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;direct_embed_docker_python&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;DirectRunner uses the embedded Python environment when &#39;</span>
<span class="s1">&#39;the default Python docker environment is specified.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--direct_test_splits&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="p">{},</span>
<span class="nb">type</span><span class="o">=</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Split test configuration of the json form &#39;</span>
<span class="s1">&#39;{&quot;step_name&quot;: {&quot;timings&quot;: [...], &quot;fractions&quot;: [...]}, ...} &#39;</span>
<span class="s1">&#39;where step_name is the name of a step controlling the stage to which &#39;</span>
<span class="s1">&#39;splits will be sent, timings is a list of floating-point times &#39;</span>
<span class="s1">&#39;(in seconds) at which the split requests will be sent, and &#39;</span>
<span class="s1">&#39;fractions is a corresponding list of floating points to use in the &#39;</span>
<span class="s1">&#39;split requests themselves.&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="GoogleCloudOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions">[docs]</a><span class="k">class</span> <span class="nc">GoogleCloudOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Google Cloud Dataflow service execution options.&quot;&quot;&quot;</span>
<span class="n">BIGQUERY_API_SERVICE</span> <span class="o">=</span> <span class="s1">&#39;bigquery.googleapis.com&#39;</span>
<span class="n">COMPUTE_API_SERVICE</span> <span class="o">=</span> <span class="s1">&#39;compute.googleapis.com&#39;</span>
<span class="n">STORAGE_API_SERVICE</span> <span class="o">=</span> <span class="s1">&#39;storage.googleapis.com&#39;</span>
<span class="n">DATAFLOW_ENDPOINT</span> <span class="o">=</span> <span class="s1">&#39;https://dataflow.googleapis.com&#39;</span>
<span class="n">OAUTH_SCOPES</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;https://www.googleapis.com/auth/bigquery&#39;</span><span class="p">,</span>
<span class="s1">&#39;https://www.googleapis.com/auth/cloud-platform&#39;</span><span class="p">,</span>
<span class="s1">&#39;https://www.googleapis.com/auth/devstorage.full_control&#39;</span><span class="p">,</span>
<span class="s1">&#39;https://www.googleapis.com/auth/userinfo.email&#39;</span><span class="p">,</span>
<span class="s1">&#39;https://www.googleapis.com/auth/datastore&#39;</span><span class="p">,</span>
<span class="s1">&#39;https://www.googleapis.com/auth/spanner.admin&#39;</span><span class="p">,</span>
<span class="s1">&#39;https://www.googleapis.com/auth/spanner.data&#39;</span>
<span class="p">]</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dataflow_endpoint&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">DATAFLOW_ENDPOINT</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;The URL for the Dataflow API. If not set, the default public URL &#39;</span>
<span class="s1">&#39;will be used.&#39;</span><span class="p">))</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--project&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Name of the Cloud project owning the Dataflow &#39;</span>
<span class="s1">&#39;job.&#39;</span><span class="p">)</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--job_name&#39;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="s1">&#39;Name of the Cloud Dataflow job.&#39;</span><span class="p">)</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--staging_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCS path for staging code packages needed by &#39;</span>
<span class="s1">&#39;workers.&#39;</span><span class="p">)</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="c1"># If staging_location is not set, it defaults to temp_location.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--temp_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCS path for saving temporary workflow jobs.&#39;</span><span class="p">)</span>
<span class="c1"># The Google Compute Engine region for creating Dataflow jobs. See</span>
<span class="c1"># https://cloud.google.com/compute/docs/regions-zones/regions-zones for a</span>
<span class="c1"># list of valid options.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--region&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The Google Compute Engine region for creating &#39;</span>
<span class="s1">&#39;Dataflow job.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--service_account_email&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Identity to run virtual machines as.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--no_auth&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;no_auth&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Skips authorizing credentials with Google Cloud.&#39;</span><span class="p">)</span>
<span class="c1"># Option to run templated pipelines</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--template_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Save job to specified local or GCS location.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--label&#39;</span><span class="p">,</span>
<span class="s1">&#39;--labels&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;labels&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Labels to be applied to this Dataflow job. &#39;</span>
<span class="s1">&#39;Labels are key value pairs separated by = &#39;</span>
<span class="s1">&#39;(e.g. --label key=value) or &#39;</span>
<span class="s1">&#39;(--labels=</span><span class="se">\&#39;</span><span class="s1">{ &quot;key&quot;: &quot;value&quot;, &quot;mass&quot;: &quot;1_3kg&quot;, &quot;count&quot;: &quot;3&quot; }</span><span class="se">\&#39;</span><span class="s1">).&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--update&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Update an existing streaming Cloud Dataflow job. &#39;</span>
<span class="s1">&#39;Experimental. &#39;</span>
<span class="s1">&#39;See https://cloud.google.com/dataflow/docs/guides/&#39;</span>
<span class="s1">&#39;updating-a-pipeline&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--transform_name_mapping&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The transform mapping that maps the named &#39;</span>
<span class="s1">&#39;transforms in your prior pipeline code to names &#39;</span>
<span class="s1">&#39;in your replacement pipeline code.&#39;</span>
<span class="s1">&#39;Experimental. &#39;</span>
<span class="s1">&#39;See https://cloud.google.com/dataflow/docs/guides/&#39;</span>
<span class="s1">&#39;updating-a-pipeline&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--enable_streaming_engine&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable Windmill Service for this Dataflow job. &#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dataflow_kms_key&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Set a Google Cloud KMS key name to be used in &#39;</span>
<span class="s1">&#39;Dataflow state operations (GBK, Streaming).&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--create_from_snapshot&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The snapshot from which the job should be created.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--flexrs_goal&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;COST_OPTIMIZED&#39;</span><span class="p">,</span> <span class="s1">&#39;SPEED_OPTIMIZED&#39;</span><span class="p">],</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Set the Flexible Resource Scheduling mode&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dataflow_service_option&#39;</span><span class="p">,</span>
<span class="s1">&#39;--dataflow_service_options&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;dataflow_service_options&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Options to configure the Dataflow service. These &#39;</span>
<span class="s1">&#39;options decouple service side feature availbility &#39;</span>
<span class="s1">&#39;from the Apache Beam release cycle.&#39;</span>
<span class="s1">&#39;Note: If set programmatically, must be set as a &#39;</span>
<span class="s1">&#39;list of strings&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--enable_hot_key_logging&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;When true, will enable the direct logging of any detected hot &#39;</span>
<span class="s1">&#39;keys into Cloud Logging. Warning: this will log the literal key as an &#39;</span>
<span class="s1">&#39;unobfuscated string.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--enable_artifact_caching&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;When true, artifacts will be cached across job submissions in &#39;</span>
<span class="s1">&#39;the GCS staging bucket&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--impersonate_service_account&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;All API requests will be made as the given service account or &#39;</span>
<span class="s1">&#39;target service account in an impersonation delegation chain &#39;</span>
<span class="s1">&#39;instead of the currently selected account. You can specify &#39;</span>
<span class="s1">&#39;either a single service account as the impersonator, or a &#39;</span>
<span class="s1">&#39;comma-separated list of service accounts to create an &#39;</span>
<span class="s1">&#39;impersonation delegation chain.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--gcp_oauth_scope&#39;</span><span class="p">,</span>
<span class="s1">&#39;--gcp_oauth_scopes&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;gcp_oauth_scopes&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">OAUTH_SCOPES</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Controls the OAuth scopes that will be requested when creating &#39;</span>
<span class="s1">&#39;GCP credentials. Note: If set programmatically, must be set as a &#39;</span>
<span class="s1">&#39;list of strings&#39;</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">_create_default_gcs_bucket</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp</span> <span class="kn">import</span> <span class="n">gcsio</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="n">_LOGGER</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s1">&#39;Unable to create default GCS bucket.&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="n">bucket</span> <span class="o">=</span> <span class="n">gcsio</span><span class="o">.</span><span class="n">get_or_create_default_gcs_bucket</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">if</span> <span class="n">bucket</span><span class="p">:</span>
<span class="k">return</span> <span class="s1">&#39;gs://</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">bucket</span><span class="o">.</span><span class="n">id</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span>
<div class="viewcode-block" id="GoogleCloudOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">validator</span><span class="o">.</span><span class="n">is_service_runner</span><span class="p">():</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_cloud_options</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="c1"># Validating temp_location, or adding a default if there are issues</span>
<span class="n">temp_location_errors</span> <span class="o">=</span> <span class="n">validator</span><span class="o">.</span><span class="n">validate_gcs_path</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;temp_location&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">temp_location_errors</span><span class="p">:</span>
<span class="n">default_bucket</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_create_default_gcs_bucket</span><span class="p">()</span>
<span class="k">if</span> <span class="n">default_bucket</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">temp_location_errors</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="nb">setattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;temp_location&#39;</span><span class="p">,</span> <span class="n">default_bucket</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;staging_location&#39;</span><span class="p">,</span>
<span class="kc">None</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;temp_location&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_gcs_path</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;staging_location&#39;</span><span class="p">))</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">DebugOptions</span><span class="p">)</span><span class="o">.</span><span class="n">dataflow_job_file</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">template_location</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="s1">&#39;--dataflow_job_file and --template_location &#39;</span>
<span class="s1">&#39;are mutually exclusive.&#39;</span><span class="p">)</span>
<span class="c1"># Validate that dataflow_service_options is a list</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_service_options</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span>
<span class="n">validator</span><span class="o">.</span><span class="n">validate_repeatable_argument_passed_as_list</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;dataflow_service_options&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<div class="viewcode-block" id="AzureOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.AzureOptions">[docs]</a><span class="k">class</span> <span class="nc">AzureOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Azure Blob Storage options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--azure_connection_string&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Connection string of the Azure Blob Storage Account.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--blob_service_endpoint&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;URL of the Azure Blob Storage Account.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--azure_managed_identity_client_id&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Client ID of a user-assigned managed identity.&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="AzureOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.AzureOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">azure_connection_string</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">blob_service_endpoint</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="s1">&#39;--azure_connection_string and &#39;</span>
<span class="s1">&#39;--blob_service_endpoint are mutually exclusive.&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<div class="viewcode-block" id="HadoopFileSystemOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.HadoopFileSystemOptions">[docs]</a><span class="k">class</span> <span class="nc">HadoopFileSystemOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;``HadoopFileSystem`` connection options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_host&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Hostname or address of the HDFS namenode.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_port&#39;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Port of the HDFS namenode.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_user&#39;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;HDFS username to use.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_full_urls&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;If set, URLs will be parsed as &quot;hdfs://server/path/...&quot;, instead &#39;</span>
<span class="s1">&#39;of &quot;hdfs://path/...&quot;. The &quot;server&quot; part will be unused (use &#39;</span>
<span class="s1">&#39;--hdfs_host and --hdfs_port).&#39;</span><span class="p">))</span>
<div class="viewcode-block" id="HadoopFileSystemOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.HadoopFileSystemOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_optional_argument_positive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;port&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<span class="c1"># Command line options controlling the worker pool configuration.</span>
<span class="c1"># TODO(silviuc): Update description when autoscaling options are in.</span>
<div class="viewcode-block" id="WorkerOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.WorkerOptions">[docs]</a><span class="k">class</span> <span class="nc">WorkerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Worker pool configuration options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--num_workers&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Number of workers to use when executing the Dataflow job. If not &#39;</span>
<span class="s1">&#39;set, the Dataflow service will use a reasonable default.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--max_num_workers&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Maximum number of workers to use when executing the Dataflow job.&#39;</span>
<span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--autoscaling_algorithm&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;NONE&#39;</span><span class="p">,</span> <span class="s1">&#39;THROUGHPUT_BASED&#39;</span><span class="p">],</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># Meaning unset, distinct from &#39;NONE&#39; meaning don&#39;t scale</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;If and how to autoscale the workerpool.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_machine_type&#39;</span><span class="p">,</span>
<span class="s1">&#39;--machine_type&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;machine_type&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Machine type to create Dataflow worker VMs as. See &#39;</span>
<span class="s1">&#39;https://cloud.google.com/compute/docs/machine-types &#39;</span>
<span class="s1">&#39;for a list of valid options. If not set, &#39;</span>
<span class="s1">&#39;the Dataflow service will choose a reasonable &#39;</span>
<span class="s1">&#39;default.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--disk_size_gb&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Remote worker disk size, in gigabytes, or 0 to use the default &#39;</span>
<span class="s1">&#39;size. If not set, the Dataflow service will use a reasonable &#39;</span>
<span class="s1">&#39;default.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_disk_type&#39;</span><span class="p">,</span>
<span class="s1">&#39;--disk_type&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;disk_type&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Specifies what type of persistent disk should be used.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_region&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;The Compute Engine region (https://cloud.google.com/compute/docs/&#39;</span>
<span class="s1">&#39;regions-zones/regions-zones) in which worker processing should &#39;</span>
<span class="s1">&#39;occur, e.g. &quot;us-west1&quot;. Mutually exclusive with worker_zone. If &#39;</span>
<span class="s1">&#39;neither worker_region nor worker_zone is specified, default to &#39;</span>
<span class="s1">&#39;same value as --region.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_zone&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;The Compute Engine zone (https://cloud.google.com/compute/docs/&#39;</span>
<span class="s1">&#39;regions-zones/regions-zones) in which worker processing should &#39;</span>
<span class="s1">&#39;occur, e.g. &quot;us-west1-a&quot;. Mutually exclusive with worker_region. &#39;</span>
<span class="s1">&#39;If neither worker_region nor worker_zone is specified, the &#39;</span>
<span class="s1">&#39;Dataflow service will choose a zone in --region based on &#39;</span>
<span class="s1">&#39;available capacity.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--zone&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;GCE availability zone for launching workers. Default is up to the &#39;</span>
<span class="s1">&#39;Dataflow service. This flag is deprecated, and will be replaced &#39;</span>
<span class="s1">&#39;by worker_zone.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--network&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;GCE network for launching workers. Default is up to the Dataflow &#39;</span>
<span class="s1">&#39;service.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--subnetwork&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;GCE subnetwork for launching workers. Default is up to the &#39;</span>
<span class="s1">&#39;Dataflow service. Expected format is &#39;</span>
<span class="s1">&#39;regions/REGION/subnetworks/SUBNETWORK or the fully qualified &#39;</span>
<span class="s1">&#39;subnetwork name. For more information, see &#39;</span>
<span class="s1">&#39;https://cloud.google.com/compute/docs/vpc/&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_harness_container_image&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Docker registry location of container image to use for the &#39;</span>
<span class="s1">&#39;worker harness. If not set, an appropriate approved Google Cloud &#39;</span>
<span class="s1">&#39;Dataflow image will be used based on the version of the &#39;</span>
<span class="s1">&#39;SDK. Note: This flag is deprecated and only supports &#39;</span>
<span class="s1">&#39;approved Google Cloud Dataflow container images. To provide a &#39;</span>
<span class="s1">&#39;custom container image, use sdk_container_image instead.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--sdk_container_image&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Docker registry location of container image to use for the &#39;</span>
<span class="s1">&#39;worker harness. If not set, an appropriate approved Google Cloud &#39;</span>
<span class="s1">&#39;Dataflow image will be used based on the version of the &#39;</span>
<span class="s1">&#39;SDK. If set for a non-portable pipeline, only official &#39;</span>
<span class="s1">&#39;Google Cloud Dataflow container images may be used here.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--sdk_harness_container_image_overrides&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Overrides for SDK harness container images. Could be for the &#39;</span>
<span class="s1">&#39;local SDK or for a remote SDK that pipeline has to support due &#39;</span>
<span class="s1">&#39;to a cross-language transform. Each entry consist of two values &#39;</span>
<span class="s1">&#39;separated by a comma where first value gives a regex to &#39;</span>
<span class="s1">&#39;identify the container image to override and the second value &#39;</span>
<span class="s1">&#39;gives the replacement container image.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--default_sdk_harness_log_level&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Controls the default log level of all loggers without a log level &#39;</span>
<span class="s1">&#39;override. Values can be either a labeled level or a number &#39;</span>
<span class="s1">&#39;(See https://docs.python.org/3/library/logging.html#levels). &#39;</span>
<span class="s1">&#39;Default log level is INFO.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--sdk_harness_log_level_overrides&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="n">_DictUnionAction</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Controls the log levels for specifically named loggers. The &#39;</span>
<span class="s1">&#39;expected format is a json string: </span><span class="se">\&#39;</span><span class="s1">{&quot;module&quot;:&quot;log_level&quot;,...}</span><span class="se">\&#39;</span><span class="s1">. &#39;</span>
<span class="s1">&#39;For example, by specifying the value </span><span class="se">\&#39;</span><span class="s1">{&quot;a.b.c&quot;:&quot;DEBUG&quot;}</span><span class="se">\&#39;</span><span class="s1">, &#39;</span>
<span class="s1">&#39;the logger underneath the module &quot;a.b.c&quot; will be configured to &#39;</span>
<span class="s1">&#39;output logs at the DEBUG level. Similarly, by specifying the &#39;</span>
<span class="s1">&#39;value </span><span class="se">\&#39;</span><span class="s1">{&quot;a.b.c&quot;:&quot;WARNING&quot;}</span><span class="se">\&#39;</span><span class="s1"> all loggers underneath the &quot;a.b.c&quot; &#39;</span>
<span class="s1">&#39;module will be configured to output logs at the WARNING level. &#39;</span>
<span class="s1">&#39;Also, note that when multiple overrides are specified, the exact &#39;</span>
<span class="s1">&#39;name followed by the closest parent takes precedence.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--use_public_ips&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether to assign public IP addresses to the worker VMs.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--no_use_public_ips&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;use_public_ips&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_false&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether to assign only private IP addresses to the worker VMs.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--min_cpu_platform&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;min_cpu_platform&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCE minimum CPU platform. Default is determined by GCP.&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="WorkerOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.WorkerOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_sdk_container_image_options</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="k">if</span> <span class="n">validator</span><span class="o">.</span><span class="n">is_service_runner</span><span class="p">():</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_num_workers</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_worker_region_zone</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<div class="viewcode-block" id="DebugOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions">[docs]</a><span class="k">class</span> <span class="nc">DebugOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dataflow_job_file&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Debug file to write the workflow specification.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--experiment&#39;</span><span class="p">,</span>
<span class="s1">&#39;--experiments&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;experiments&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Runners may provide a number of experimental features that can be &#39;</span>
<span class="s1">&#39;enabled with this flag. Please sync with the owners of the runner &#39;</span>
<span class="s1">&#39;before enabling any experiments.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--number_of_worker_harness_threads&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Number of threads per worker to use on the runner. If left &#39;</span>
<span class="s1">&#39;unspecified, the runner will compute an appropriate number of &#39;</span>
<span class="s1">&#39;threads to use. Currently only enabled for DataflowRunner when &#39;</span>
<span class="s1">&#39;experiment </span><span class="se">\&#39;</span><span class="s1">use_runner_v2</span><span class="se">\&#39;</span><span class="s1"> is enabled.&#39;</span><span class="p">))</span>
<div class="viewcode-block" id="DebugOptions.add_experiment"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions.add_experiment">[docs]</a> <span class="k">def</span> <span class="nf">add_experiment</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">experiment</span><span class="p">):</span>
<span class="c1"># pylint: disable=access-member-before-definition</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">experiments</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">experiment</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">experiment</span><span class="p">)</span></div>
<div class="viewcode-block" id="DebugOptions.lookup_experiment"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions.lookup_experiment">[docs]</a> <span class="k">def</span> <span class="nf">lookup_experiment</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="k">return</span> <span class="n">default</span>
<span class="k">elif</span> <span class="n">key</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">for</span> <span class="n">experiment</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="k">if</span> <span class="n">experiment</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="n">key</span> <span class="o">+</span> <span class="s1">&#39;=&#39;</span><span class="p">):</span>
<span class="k">return</span> <span class="n">experiment</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;=&#39;</span><span class="p">,</span> <span class="mi">1</span><span class="p">)[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">return</span> <span class="n">default</span></div>
<div class="viewcode-block" id="DebugOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span>
<span class="n">validator</span><span class="o">.</span><span class="n">validate_repeatable_argument_passed_as_list</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;experiments&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<div class="viewcode-block" id="ProfilingOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.ProfilingOptions">[docs]</a><span class="k">class</span> <span class="nc">ProfilingOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--profile_cpu&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable work item CPU profiling.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--profile_memory&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable work item heap profiling.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--profile_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;path for saving profiler data.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--profile_sample_rate&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">float</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mf">1.0</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;A number between 0 and 1 indicating the ratio &#39;</span>
<span class="s1">&#39;of bundles that should be profiled.&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="SetupOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.SetupOptions">[docs]</a><span class="k">class</span> <span class="nc">SetupOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># Options for installing dependencies in the worker.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--requirements_file&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Path to a requirements file containing package dependencies. &#39;</span>
<span class="s1">&#39;Typically it is produced by a pip freeze command. More details: &#39;</span>
<span class="s1">&#39;https://pip.pypa.io/en/latest/reference/pip_freeze.html. &#39;</span>
<span class="s1">&#39;If used, all the packages specified will be downloaded, &#39;</span>
<span class="s1">&#39;cached (use --requirements_cache to change default location), &#39;</span>
<span class="s1">&#39;and then staged so that they can be automatically installed in &#39;</span>
<span class="s1">&#39;workers during startup. The cache is refreshed as needed &#39;</span>
<span class="s1">&#39;avoiding extra downloads for existing packages. Typically the &#39;</span>
<span class="s1">&#39;file is named requirements.txt.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--requirements_cache&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Path to a folder to cache the packages specified in &#39;</span>
<span class="s1">&#39;the requirements file using the --requirements_file option.&#39;</span>
<span class="s1">&#39;If you want to skip populating requirements cache, please &#39;</span>
<span class="s1">&#39;specify --requirements_cache=&quot;skip&quot;.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--requirements_cache_only_sources&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Enable this flag to populate requirements cache only &#39;</span>
<span class="s1">&#39;with Source distributions(sdists) of the dependencies &#39;</span>
<span class="s1">&#39;mentioned in the --requirements_file&#39;</span>
<span class="s1">&#39;Note: (BEAM-4032): This flag may significantly slow down &#39;</span>
<span class="s1">&#39;the pipeline submission. It is added to preserve the requirements&#39;</span>
<span class="s1">&#39; cache behavior prior to 2.37.0 and will likely be removed in &#39;</span>
<span class="s1">&#39;future releases.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--setup_file&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Path to a setup Python file containing package dependencies. If &#39;</span>
<span class="s1">&#39;specified, the file</span><span class="se">\&#39;</span><span class="s1">s containing folder is assumed to have the &#39;</span>
<span class="s1">&#39;structure required for a setuptools setup package. The file must &#39;</span>
<span class="s1">&#39;be named setup.py. More details: &#39;</span>
<span class="s1">&#39;https://pythonhosted.org/an_example_pypi_project/setuptools.html &#39;</span>
<span class="s1">&#39;During job submission a source distribution will be built and &#39;</span>
<span class="s1">&#39;the worker will install the resulting package before running any &#39;</span>
<span class="s1">&#39;custom code.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--beam_plugin&#39;</span><span class="p">,</span>
<span class="s1">&#39;--beam_plugins&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;beam_plugins&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Bootstrap the python process before executing any code by &#39;</span>
<span class="s1">&#39;importing all the plugins used in the pipeline. Please pass a &#39;</span>
<span class="s1">&#39;comma separatedlist of import paths to be included. This is &#39;</span>
<span class="s1">&#39;currently an experimental flag and provides no stability. &#39;</span>
<span class="s1">&#39;Multiple --beam_plugin options can be specified if more than &#39;</span>
<span class="s1">&#39;one plugin is needed.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--pickle_library&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;default&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Chooses which pickle library to use. Options are dill, &#39;</span>
<span class="s1">&#39;cloudpickle or default.&#39;</span><span class="p">),</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;cloudpickle&#39;</span><span class="p">,</span> <span class="s1">&#39;default&#39;</span><span class="p">,</span> <span class="s1">&#39;dill&#39;</span><span class="p">])</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--save_main_session&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Save the main session state so that pickled functions and classes &#39;</span>
<span class="s1">&#39;defined in __main__ (e.g. interactive session) can be unpickled. &#39;</span>
<span class="s1">&#39;Some workflows do not need the session state if for instance all &#39;</span>
<span class="s1">&#39;their functions/classes are defined in proper modules &#39;</span>
<span class="s1">&#39;(not __main__) and the modules are importable in the worker. &#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--sdk_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;default&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Override the default location from where the Beam SDK is &#39;</span>
<span class="s1">&#39;downloaded. It can be a URL, a GCS path, or a local path to an &#39;</span>
<span class="s1">&#39;SDK tarball. Workflow submissions will download or copy an SDK &#39;</span>
<span class="s1">&#39;tarball from here. If set to the string &quot;default&quot;, a standard &#39;</span>
<span class="s1">&#39;SDK location is used. If empty, no SDK is copied.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--extra_package&#39;</span><span class="p">,</span>
<span class="s1">&#39;--extra_packages&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;extra_packages&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Local path to a Python package file. The file is expected to be &#39;</span>
<span class="s1">&#39;(1) a package tarball (&quot;.tar&quot;), (2) a compressed package tarball &#39;</span>
<span class="s1">&#39;(&quot;.tar.gz&quot;), (3) a Wheel file (&quot;.whl&quot;) or (4) a compressed &#39;</span>
<span class="s1">&#39;package zip file (&quot;.zip&quot;) which can be installed using the &#39;</span>
<span class="s1">&#39;&quot;pip install&quot; command of the standard pip package. Multiple &#39;</span>
<span class="s1">&#39;--extra_package options can be specified if more than one &#39;</span>
<span class="s1">&#39;package is needed. During job submission, the files will be &#39;</span>
<span class="s1">&#39;staged in the staging area (--staging_location option) and the &#39;</span>
<span class="s1">&#39;workers will install them in same order they were specified on &#39;</span>
<span class="s1">&#39;the command line.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--prebuild_sdk_container_engine&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Prebuild sdk worker container image before job submission. If &#39;</span>
<span class="s1">&#39;enabled, SDK invokes the boot sequence in SDK worker &#39;</span>
<span class="s1">&#39;containers to install all pipeline dependencies in the &#39;</span>
<span class="s1">&#39;container, and uses the prebuilt image in the pipeline &#39;</span>
<span class="s1">&#39;environment. This may speed up pipeline execution. To enable, &#39;</span>
<span class="s1">&#39;select the Docker build engine: local_docker using &#39;</span>
<span class="s1">&#39;locally-installed Docker or cloud_build for using Google Cloud &#39;</span>
<span class="s1">&#39;Build (requires a GCP project with Cloud Build API enabled). You &#39;</span>
<span class="s1">&#39;can also subclass SdkContainerImageBuilder and use that to build &#39;</span>
<span class="s1">&#39;in other environments.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--prebuild_sdk_container_base_image&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Deprecated. Use --sdk_container_image instead.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--cloud_build_machine_type&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;If specified, use the machine type explicitly when prebuilding&#39;</span>
<span class="s1">&#39;SDK container image on Google Cloud Build.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--docker_registry_push_url&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Docker registry url to use for tagging and pushing the prebuilt &#39;</span>
<span class="s1">&#39;sdk worker container image.&#39;</span><span class="p">))</span>
<div class="viewcode-block" id="SetupOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.SetupOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_container_prebuilding_options</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<span class="k">class</span> <span class="nc">PortableOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Portable options are common options expected to be understood by most of</span>
<span class="sd"> the portable runners. Should generally be kept in sync with</span>
<span class="sd"> PortablePipelineOptions.java.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--job_endpoint&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Job service endpoint to use. Should be in the form of host &#39;</span>
<span class="s1">&#39;and port, e.g. localhost:8099.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--artifact_endpoint&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Artifact staging endpoint to use. Should be in the form of host &#39;</span>
<span class="s1">&#39;and port, e.g. localhost:8098. If none is specified, the &#39;</span>
<span class="s1">&#39;artifact endpoint sent from the job server is used.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--job_server_timeout&#39;</span><span class="p">,</span>
<span class="s1">&#39;--job-server-timeout&#39;</span><span class="p">,</span> <span class="c1"># For backwards compatibility.</span>
<span class="n">default</span><span class="o">=</span><span class="mi">60</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Job service request timeout in seconds. The timeout &#39;</span>
<span class="s1">&#39;determines the max time the driver program will wait to &#39;</span>
<span class="s1">&#39;get a response from the job server. NOTE: the timeout does not &#39;</span>
<span class="s1">&#39;apply to the actual pipeline run time. The driver program can &#39;</span>
<span class="s1">&#39;still wait for job completion indefinitely.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--environment_type&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Set the default environment type for running &#39;</span>
<span class="s1">&#39;user code. DOCKER (default) runs user code in a container. &#39;</span>
<span class="s1">&#39;PROCESS runs user code in processes that are automatically &#39;</span>
<span class="s1">&#39;started on each worker node. LOOPBACK runs user code on the &#39;</span>
<span class="s1">&#39;same process that originally submitted the job.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--environment_config&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Set environment configuration for running the user code.</span><span class="se">\n</span><span class="s1"> For &#39;</span>
<span class="s1">&#39;DOCKER: Url for the docker image.</span><span class="se">\n</span><span class="s1"> For PROCESS: json of the &#39;</span>
<span class="s1">&#39;form {&quot;os&quot;: &quot;&lt;OS&gt;&quot;, &quot;arch&quot;: &quot;&lt;ARCHITECTURE&gt;&quot;, &quot;command&quot;: &#39;</span>
<span class="s1">&#39;&quot;&lt;process to execute&gt;&quot;, &quot;env&quot;:{&quot;&lt;Environment variables 1&gt;&quot;: &#39;</span>
<span class="s1">&#39;&quot;&lt;ENV_VAL&gt;&quot;} }. All fields in the json are optional except &#39;</span>
<span class="s1">&#39;command.</span><span class="se">\n\n</span><span class="s1">Prefer using --environment_options instead.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--environment_option&#39;</span><span class="p">,</span>
<span class="s1">&#39;--environment_options&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;environment_options&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Environment configuration for running the user code. &#39;</span>
<span class="s1">&#39;Recognized options depend on --environment_type.</span><span class="se">\n</span><span class="s1"> &#39;</span>
<span class="s1">&#39;For DOCKER: docker_container_image (optional)</span><span class="se">\n</span><span class="s1"> &#39;</span>
<span class="s1">&#39;For PROCESS: process_command (required), process_variables &#39;</span>
<span class="s1">&#39;(optional, comma-separated)</span><span class="se">\n</span><span class="s1"> &#39;</span>
<span class="s1">&#39;For EXTERNAL: external_service_address (required)&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--sdk_worker_parallelism&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Sets the number of sdk worker processes that will run on each &#39;</span>
<span class="s1">&#39;worker node. Default is 1. If 0, a value will be chosen by the &#39;</span>
<span class="s1">&#39;runner.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--environment_cache_millis&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Duration in milliseconds for environment cache within a job. &#39;</span>
<span class="s1">&#39;0 means no caching.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--output_executable_path&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Create an executable jar at this path rather than running &#39;</span>
<span class="s1">&#39;the pipeline.&#39;</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="k">return</span> <span class="n">validator</span><span class="o">.</span><span class="n">validate_environment_options</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">add_environment_option</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">option</span><span class="p">):</span>
<span class="c1"># pylint: disable=access-member-before-definition</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">option</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">option</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">lookup_environment_option</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="p">:</span>
<span class="k">return</span> <span class="n">default</span>
<span class="k">elif</span> <span class="n">key</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">environment_options</span><span class="p">:</span>
<span class="k">if</span> <span class="n">option</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="n">key</span> <span class="o">+</span> <span class="s1">&#39;=&#39;</span><span class="p">):</span>
<span class="k">return</span> <span class="n">option</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;=&#39;</span><span class="p">,</span> <span class="mi">1</span><span class="p">)[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">return</span> <span class="n">default</span>
<span class="k">class</span> <span class="nc">JobServerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Options for starting a Beam job server. Roughly corresponds to</span>
<span class="sd"> JobServerDriver.ServerConfiguration in Java.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--artifacts_dir&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The location to store staged artifact files. &#39;</span>
<span class="s1">&#39;Any Beam-supported file system is allowed. &#39;</span>
<span class="s1">&#39;If unset, the local temp dir will be used.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--job_port&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Port to use for the job service. 0 to use a &#39;</span>
<span class="s1">&#39;dynamic port.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--artifact_port&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Port to use for artifact staging. 0 to use a &#39;</span>
<span class="s1">&#39;dynamic port.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--expansion_port&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Port to use for artifact staging. 0 to use a &#39;</span>
<span class="s1">&#39;dynamic port.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--job_server_java_launcher&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;java&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The Java Application Launcher executable file to use for &#39;</span>
<span class="s1">&#39;starting a Java job server. If unset, `java` from the &#39;</span>
<span class="s1">&#39;environment</span><span class="se">\&#39;</span><span class="s1">s $PATH is used.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--job_server_jvm_properties&#39;</span><span class="p">,</span>
<span class="s1">&#39;--job_server_jvm_property&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;job_server_jvm_properties&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="p">[],</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;JVM properties to pass to a Java job server.&#39;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">FlinkRunnerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="c1"># These should stay in sync with gradle.properties.</span>
<span class="n">PUBLISHED_FLINK_VERSIONS</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;1.12&#39;</span><span class="p">,</span> <span class="s1">&#39;1.13&#39;</span><span class="p">,</span> <span class="s1">&#39;1.14&#39;</span><span class="p">,</span> <span class="s1">&#39;1.15&#39;</span><span class="p">,</span> <span class="s1">&#39;1.16&#39;</span><span class="p">]</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--flink_master&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;[auto]&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Flink master address (http://host:port)&#39;</span>
<span class="s1">&#39; Use &quot;[local]&quot; to start a local cluster&#39;</span>
<span class="s1">&#39; for the execution. Use &quot;[auto]&quot; if you&#39;</span>
<span class="s1">&#39; plan to either execute locally or let the&#39;</span>
<span class="s1">&#39; Flink job server infer the cluster address.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--flink_version&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">PUBLISHED_FLINK_VERSIONS</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">],</span>
<span class="n">choices</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">PUBLISHED_FLINK_VERSIONS</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Flink version to use.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--flink_job_server_jar&#39;</span><span class="p">,</span> <span class="n">help</span><span class="o">=</span><span class="s1">&#39;Path or URL to a flink jobserver jar.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--flink_submit_uber_jar&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Create and upload an uberjar to the flink master&#39;</span>
<span class="s1">&#39; directly, rather than starting up a job server.&#39;</span>
<span class="s1">&#39; Only applies when flink_master is set to a&#39;</span>
<span class="s1">&#39; cluster address. Requires Python 3.6+.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--parallelism&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=-</span><span class="mi">1</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The degree of parallelism to be used when distributing&#39;</span>
<span class="s1">&#39; operations onto workers. If the parallelism is not set, the&#39;</span>
<span class="s1">&#39; configured Flink default is used, or 1 if none can be found.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--max_parallelism&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=-</span><span class="mi">1</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The pipeline wide maximum degree of parallelism to be used. The&#39;</span>
<span class="s1">&#39; maximum parallelism specifies the upper limit for dynamic scaling&#39;</span>
<span class="s1">&#39; and the number of key groups used for partitioned state.&#39;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">SparkRunnerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--spark_master_url&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;local[4]&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Spark master URL (spark://HOST:PORT). &#39;</span>
<span class="s1">&#39;Use &quot;local&quot; (single-threaded) or &quot;local[*]&quot; &#39;</span>
<span class="s1">&#39;(multi-threaded) to start a local cluster for &#39;</span>
<span class="s1">&#39;the execution.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--spark_job_server_jar&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Path or URL to a Beam Spark job server jar. &#39;</span>
<span class="s1">&#39;Overrides --spark_version.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--spark_submit_uber_jar&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Create and upload an uber jar to the Spark REST&#39;</span>
<span class="s1">&#39; endpoint, rather than starting up a job server.&#39;</span>
<span class="s1">&#39; Requires Python 3.6+.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--spark_rest_url&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;URL for the Spark REST endpoint. &#39;</span>
<span class="s1">&#39;Only required when using spark_submit_uber_jar. &#39;</span>
<span class="s1">&#39;For example, http://hostname:6066&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--spark_version&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;3&#39;</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;3&#39;</span><span class="p">],</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Spark major version to use.&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="TestOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TestOptions">[docs]</a><span class="k">class</span> <span class="nc">TestOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># Options for e2e test pipeline.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--on_success_matcher&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Verify state/output of e2e test pipeline. This is pickled &#39;</span>
<span class="s1">&#39;version of the matcher which should extends &#39;</span>
<span class="s1">&#39;hamcrest.core.base_matcher.BaseMatcher.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dry_run&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Used in unit testing runners without submitting the &#39;</span>
<span class="s1">&#39;actual job.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--wait_until_finish_duration&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The time to wait (in milliseconds) for test pipeline to finish. &#39;</span>
<span class="s1">&#39;If it is set to None, it will wait indefinitely until the job &#39;</span>
<span class="s1">&#39;is finished.&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="TestOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TestOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">TestOptions</span><span class="p">)</span><span class="o">.</span><span class="n">on_success_matcher</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_test_matcher</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;on_success_matcher&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<span class="k">class</span> <span class="nc">TestDataflowOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># This option is passed to Dataflow Runner&#39;s Pub/Sub client. The camelCase</span>
<span class="c1"># style in &#39;dest&#39; matches the runner&#39;s.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--pubsub_root_url&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;pubsubRootUrl&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Root URL for use with the Google Cloud Pub/Sub API.&#39;</span><span class="p">,</span>
<span class="p">)</span>
<span class="c1"># TODO(silviuc): Add --files_to_stage option.</span>
<span class="c1"># This could potentially replace the --requirements_file and --setup_file.</span>
<span class="c1"># TODO(silviuc): Non-standard options. Keep them? If yes, add help too!</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="k">class</span> <span class="nc">OptionsContext</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Set default pipeline options for pipelines created in this block.</span>
<span class="sd"> This is particularly useful for pipelines implicitly created with the</span>
<span class="sd"> [python list] | PTransform</span>
<span class="sd"> construct.</span>
<span class="sd"> Can also be used as a decorator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">overrides</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: List[Dict[str, Any]]</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">options</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span>
<span class="k">def</span> <span class="fm">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">overrides</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">exn_info</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">overrides</span><span class="o">.</span><span class="n">pop</span><span class="p">()</span>
<span class="k">def</span> <span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">wrapper</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">with</span> <span class="bp">self</span><span class="p">:</span>
<span class="n">f</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">wrapper</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">augment_options</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span>
<span class="k">for</span> <span class="n">override</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="n">overrides</span><span class="p">:</span>
<span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">override</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">options</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">return</span> <span class="n">options</span>
<div class="viewcode-block" id="S3Options"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.S3Options">[docs]</a><span class="k">class</span> <span class="nc">S3Options</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># These options are passed to the S3 IO Client</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--s3_access_key_id&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The secret key to use when creating the s3 client.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--s3_secret_access_key&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The secret key to use when creating the s3 client.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--s3_session_token&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The session token to use when creating the s3 client.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--s3_endpoint_url&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The complete URL to use for the constructed s3 client.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--s3_region_name&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The name of the region associated with the s3 client.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--s3_api_version&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The API version to use with the s3 client.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--s3_verify&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether or not to verify SSL certificates with the s3 client.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--s3_disable_ssl&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;Whether or not to use SSL with the s3 client. &#39;</span>
<span class="s1">&#39;By default, SSL is used.&#39;</span><span class="p">))</span></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>