blob: 71f881544c657cee17e046a374d22cd1205508e7 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.options.pipeline_options &mdash; Apache Beam documentation</title>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="index" title="Index"
href="../../../genindex.html"/>
<link rel="search" title="Search" href="../../../search.html"/>
<link rel="top" title="Apache Beam documentation" href="../../../index.html"/>
<link rel="up" title="Module code" href="../../index.html"/>
<script src="../../../_static/js/modernizr.min.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search">
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.internal.html">apache_beam.internal package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.tools.html">apache_beam.tools package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.version.html">apache_beam.version module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.options.pipeline_options</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.options.pipeline_options</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Pipeline options obtained from command line parsing.&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
<span class="kn">import</span> <span class="nn">argparse</span>
<span class="kn">import</span> <span class="nn">json</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">list</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">object</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="k">import</span> <span class="n">RuntimeValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="k">import</span> <span class="n">StaticValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="k">import</span> <span class="n">ValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="k">import</span> <span class="n">HasDisplayData</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;PipelineOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;StandardOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;TypeOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;DirectOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;GoogleCloudOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;HadoopFileSystemOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;WorkerOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;DebugOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;ProfilingOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;SetupOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;TestOptions&#39;</span><span class="p">,</span>
<span class="p">]</span>
<span class="k">def</span> <span class="nf">_static_value_provider_of</span><span class="p">(</span><span class="n">value_type</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;&quot;Helper function to plug a ValueProvider into argparse.</span>
<span class="sd"> Args:</span>
<span class="sd"> value_type: the type of the value. Since the type param of argparse&#39;s</span>
<span class="sd"> add_argument will always be ValueProvider, we need to</span>
<span class="sd"> preserve the type of the actual value.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A partially constructed StaticValueProvider in the form of a function.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">_f</span><span class="p">(</span><span class="n">value</span><span class="p">):</span>
<span class="n">_f</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="n">value_type</span><span class="o">.</span><span class="vm">__name__</span>
<span class="k">return</span> <span class="n">StaticValueProvider</span><span class="p">(</span><span class="n">value_type</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_f</span>
<span class="k">class</span> <span class="nc">_BeamArgumentParser</span><span class="p">(</span><span class="n">argparse</span><span class="o">.</span><span class="n">ArgumentParser</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;An ArgumentParser that supports ValueProvider options.</span>
<span class="sd"> Example Usage::</span>
<span class="sd"> class TemplateUserOptions(PipelineOptions):</span>
<span class="sd"> @classmethod</span>
<span class="sd"> def _add_argparse_args(cls, parser):</span>
<span class="sd"> parser.add_value_provider_argument(&#39;--vp_arg1&#39;, default=&#39;start&#39;)</span>
<span class="sd"> parser.add_value_provider_argument(&#39;--vp_arg2&#39;)</span>
<span class="sd"> parser.add_argument(&#39;--non_vp_arg&#39;)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">add_value_provider_argument</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;ValueProvider arguments can be either of type keyword or positional.</span>
<span class="sd"> At runtime, even positional arguments will need to be supplied in the</span>
<span class="sd"> key/value form.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Extract the option name from positional argument [&#39;pos_arg&#39;]</span>
<span class="k">assert</span> <span class="n">args</span> <span class="o">!=</span> <span class="p">()</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> <span class="o">&gt;=</span> <span class="mi">1</span>
<span class="k">if</span> <span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="s1">&#39;-&#39;</span><span class="p">:</span>
<span class="n">option_name</span> <span class="o">=</span> <span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;nargs&#39;</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> <span class="c1"># make them optionally templated</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;nargs&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="s1">&#39;?&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]</span>
<span class="n">option_name</span> <span class="o">=</span> <span class="p">[</span><span class="n">i</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;--&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="n">args</span> <span class="k">if</span> <span class="n">i</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;--&#39;</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span>
<span class="c1"># reassign the type to make room for using</span>
<span class="c1"># StaticValueProvider as the type for add_argument</span>
<span class="n">value_type</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;type&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">str</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">_static_value_provider_of</span><span class="p">(</span><span class="n">value_type</span><span class="p">)</span>
<span class="c1"># reassign default to default_value to make room for using</span>
<span class="c1"># RuntimeValueProvider as the default for add_argument</span>
<span class="n">default_value</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;default&#39;</span><span class="p">)</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;default&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">RuntimeValueProvider</span><span class="p">(</span>
<span class="n">option_name</span><span class="o">=</span><span class="n">option_name</span><span class="p">,</span>
<span class="n">value_type</span><span class="o">=</span><span class="n">value_type</span><span class="p">,</span>
<span class="n">default_value</span><span class="o">=</span><span class="n">default_value</span>
<span class="p">)</span>
<span class="c1"># have add_argument do most of the work</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="c1"># The argparse package by default tries to autocomplete option names. This</span>
<span class="c1"># results in an &quot;ambiguous option&quot; error from argparse when an unknown option</span>
<span class="c1"># matching multiple known ones are used. This suppresses that behavior.</span>
<span class="k">def</span> <span class="nf">error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">message</span><span class="p">):</span>
<span class="k">if</span> <span class="n">message</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;ambiguous option: &#39;</span><span class="p">):</span>
<span class="k">return</span>
<span class="nb">super</span><span class="p">(</span><span class="n">_BeamArgumentParser</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
<div class="viewcode-block" id="PipelineOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions">[docs]</a><span class="k">class</span> <span class="nc">PipelineOptions</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;This class and subclasses are used as containers for command line options.</span>
<span class="sd"> These classes are wrappers over the standard argparse Python module</span>
<span class="sd"> (see https://docs.python.org/3/library/argparse.html). To define one option</span>
<span class="sd"> or a group of options, create a subclass from PipelineOptions.</span>
<span class="sd"> Example Usage::</span>
<span class="sd"> class XyzOptions(PipelineOptions):</span>
<span class="sd"> @classmethod</span>
<span class="sd"> def _add_argparse_args(cls, parser):</span>
<span class="sd"> parser.add_argument(&#39;--abc&#39;, default=&#39;start&#39;)</span>
<span class="sd"> parser.add_argument(&#39;--xyz&#39;, default=&#39;end&#39;)</span>
<span class="sd"> The arguments for the add_argument() method are exactly the ones</span>
<span class="sd"> described in the argparse public documentation.</span>
<span class="sd"> Pipeline objects require an options object during initialization.</span>
<span class="sd"> This is obtained simply by initializing an options class as defined above.</span>
<span class="sd"> Example Usage::</span>
<span class="sd"> p = Pipeline(options=XyzOptions())</span>
<span class="sd"> if p.options.xyz == &#39;end&#39;:</span>
<span class="sd"> raise ValueError(&#39;Option xyz has an invalid value.&#39;)</span>
<span class="sd"> Instances of PipelineOptions or any of its subclass have access to values</span>
<span class="sd"> defined by other PipelineOption subclasses (see get_all_options()), and</span>
<span class="sd"> can be converted to an instance of another PipelineOptions subclass</span>
<span class="sd"> (see view_as()). All views share the underlying data structure that stores</span>
<span class="sd"> option key-value pairs.</span>
<span class="sd"> By default the options classes will use command line arguments to initialize</span>
<span class="sd"> the options.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">flags</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initialize an options class.</span>
<span class="sd"> The initializer will traverse all subclasses, add all their argparse</span>
<span class="sd"> arguments and then parse the command line specified by flags or by default</span>
<span class="sd"> the one obtained from sys.argv.</span>
<span class="sd"> The subclasses of PipelineOptions do not need to redefine __init__.</span>
<span class="sd"> Args:</span>
<span class="sd"> flags: An iterable of command line arguments to be used. If not specified</span>
<span class="sd"> then sys.argv will be used as input for parsing arguments.</span>
<span class="sd"> **kwargs: Add overrides for arguments passed in flags.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># self._flags stores a list of not yet parsed arguments, typically,</span>
<span class="c1"># command-line flags. This list is shared across different views.</span>
<span class="c1"># See: view_as().</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_flags</span> <span class="o">=</span> <span class="n">flags</span>
<span class="c1"># Build parser that will parse options recognized by the [sub]class of</span>
<span class="c1"># PipelineOptions whose object is being instantiated.</span>
<span class="n">parser</span> <span class="o">=</span> <span class="n">_BeamArgumentParser</span><span class="p">()</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">mro</span><span class="p">():</span>
<span class="k">if</span> <span class="bp">cls</span> <span class="o">==</span> <span class="n">PipelineOptions</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">elif</span> <span class="s1">&#39;_add_argparse_args&#39;</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">:</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span>
<span class="c1"># The _visible_options attribute will contain options that were recognized</span>
<span class="c1"># by the parser.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="n">flags</span><span class="p">)</span>
<span class="c1"># self._all_options is initialized with overrides to flag values,</span>
<span class="c1"># provided in kwargs, and will store key-value pairs for options recognized</span>
<span class="c1"># by current PipelineOptions [sub]class and its views that may be created.</span>
<span class="c1"># See: view_as().</span>
<span class="c1"># This dictionary is shared across different views, and is lazily updated</span>
<span class="c1"># as each new views are created.</span>
<span class="c1"># Users access this dictionary store via __getattr__ / __setattr__ methods.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="c1"># Initialize values of keys defined by this class.</span>
<span class="k">for</span> <span class="n">option_name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="c1"># Note that options specified in kwargs will not be overwritten.</span>
<span class="k">if</span> <span class="n">option_name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">option_name</span><span class="p">]</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span>
<span class="n">option_name</span><span class="p">)</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># Override this in subclasses to provide options.</span>
<span class="k">pass</span>
<div class="viewcode-block" id="PipelineOptions.from_dictionary"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.from_dictionary">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">from_dictionary</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a PipelineOptions from a dictionary of arguments.</span>
<span class="sd"> Args:</span>
<span class="sd"> options: Dictionary of argument value pairs.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A PipelineOptions object representing the given arguments.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">flags</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">options</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">bool</span><span class="p">):</span>
<span class="k">if</span> <span class="n">v</span><span class="p">:</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">k</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="n">v</span><span class="p">:</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">i</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">cls</span><span class="p">(</span><span class="n">flags</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineOptions.get_all_options"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.get_all_options">[docs]</a> <span class="k">def</span> <span class="nf">get_all_options</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">drop_default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">add_extra_args_fn</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a dictionary of all defined arguments.</span>
<span class="sd"> Returns a dictionary of all defined arguments (arguments that are defined in</span>
<span class="sd"> any subclass of PipelineOptions) into a dictionary.</span>
<span class="sd"> Args:</span>
<span class="sd"> drop_default: If set to true, options that are equal to their default</span>
<span class="sd"> values, are not returned as part of the result dictionary.</span>
<span class="sd"> add_extra_args_fn: Callback to populate additional arguments, can be used</span>
<span class="sd"> by runner to supply otherwise unknown args.</span>
<span class="sd"> Returns:</span>
<span class="sd"> Dictionary of all args and values.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># TODO(BEAM-1319): PipelineOption sub-classes in the main session might be</span>
<span class="c1"># repeated. Pick last unique instance of each subclass to avoid conflicts.</span>
<span class="n">subset</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">parser</span> <span class="o">=</span> <span class="n">_BeamArgumentParser</span><span class="p">()</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">PipelineOptions</span><span class="o">.</span><span class="n">__subclasses__</span><span class="p">():</span>
<span class="n">subset</span><span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="bp">cls</span><span class="p">)]</span> <span class="o">=</span> <span class="bp">cls</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">subset</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span> <span class="c1"># pylint: disable=protected-access</span>
<span class="k">if</span> <span class="n">add_extra_args_fn</span><span class="p">:</span>
<span class="n">add_extra_args_fn</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span>
<span class="n">known_args</span><span class="p">,</span> <span class="n">unknown_args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span>
<span class="k">if</span> <span class="n">unknown_args</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">&quot;Discarding unparseable args: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">unknown_args</span><span class="p">)</span>
<span class="n">result</span> <span class="o">=</span> <span class="nb">vars</span><span class="p">(</span><span class="n">known_args</span><span class="p">)</span>
<span class="c1"># Apply the overrides if any</span>
<span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">result</span><span class="p">):</span>
<span class="k">if</span> <span class="n">k</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span>
<span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">k</span><span class="p">]</span>
<span class="k">if</span> <span class="p">(</span><span class="n">drop_default</span> <span class="ow">and</span>
<span class="n">parser</span><span class="o">.</span><span class="n">get_default</span><span class="p">(</span><span class="n">k</span><span class="p">)</span> <span class="o">==</span> <span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> <span class="ow">and</span>
<span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">parser</span><span class="o">.</span><span class="n">get_default</span><span class="p">(</span><span class="n">k</span><span class="p">),</span> <span class="n">ValueProvider</span><span class="p">)):</span>
<span class="k">del</span> <span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="PipelineOptions.display_data"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_all_options</span><span class="p">(</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineOptions.view_as"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.view_as">[docs]</a> <span class="k">def</span> <span class="nf">view_as</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="bp">cls</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a view of current object as provided PipelineOption subclass.</span>
<span class="sd"> Example Usage::</span>
<span class="sd"> options = PipelineOptions([&#39;--runner&#39;, &#39;Direct&#39;, &#39;--streaming&#39;])</span>
<span class="sd"> standard_options = options.view_as(StandardOptions)</span>
<span class="sd"> if standard_options.streaming:</span>
<span class="sd"> # ... start a streaming job ...</span>
<span class="sd"> Note that options objects may have multiple views, and modifications</span>
<span class="sd"> of values in any view-object will apply to current object and other</span>
<span class="sd"> view-objects.</span>
<span class="sd"> Args:</span>
<span class="sd"> cls: PipelineOptions class or any of its subclasses.</span>
<span class="sd"> Returns:</span>
<span class="sd"> An instance of cls that is intitialized using options contained in current</span>
<span class="sd"> object.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">view</span> <span class="o">=</span> <span class="bp">cls</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span>
<span class="k">for</span> <span class="n">option_name</span> <span class="ow">in</span> <span class="n">view</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="c1"># Initialize values of keys defined by a cls.</span>
<span class="c1">#</span>
<span class="c1"># Note that we do initialization only once per key to make sure that</span>
<span class="c1"># values in _all_options dict are not-recreated with each new view.</span>
<span class="c1"># This is important to make sure that values of multi-options keys are</span>
<span class="c1"># backed by the same list across multiple views, and that any overrides of</span>
<span class="c1"># pipeline options already stored in _all_options are preserved.</span>
<span class="k">if</span> <span class="n">option_name</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">option_name</span><span class="p">]</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">view</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span>
<span class="n">option_name</span><span class="p">)</span>
<span class="c1"># Note that views will still store _all_options of the source object.</span>
<span class="n">view</span><span class="o">.</span><span class="n">_all_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span>
<span class="k">return</span> <span class="n">view</span></div>
<span class="k">def</span> <span class="nf">_visible_option_list</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">option</span>
<span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="nb">dir</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">)</span> <span class="k">if</span> <span class="n">option</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="s1">&#39;_&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__dir__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">sorted</span><span class="p">(</span><span class="nb">dir</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> <span class="o">+</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">)</span> <span class="o">+</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">):</span>
<span class="c1"># Special methods which may be accessed before the object is</span>
<span class="c1"># fully constructed (e.g. in unpickling).</span>
<span class="k">if</span> <span class="n">name</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> <span class="o">==</span> <span class="n">name</span><span class="p">[</span><span class="o">-</span><span class="mi">2</span><span class="p">:]</span> <span class="o">==</span> <span class="s1">&#39;__&#39;</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">name</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span><span class="s2">&quot;&#39;</span><span class="si">%s</span><span class="s2">&#39; object has no attribute &#39;</span><span class="si">%s</span><span class="s2">&#39;&quot;</span> <span class="o">%</span>
<span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;_flags&#39;</span><span class="p">,</span> <span class="s1">&#39;_all_options&#39;</span><span class="p">,</span> <span class="s1">&#39;_visible_options&#39;</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__setattr__</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span><span class="s2">&quot;&#39;</span><span class="si">%s</span><span class="s2">&#39; object has no attribute &#39;</span><span class="si">%s</span><span class="s2">&#39;&quot;</span> <span class="o">%</span>
<span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">__str__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span>
<span class="s1">&#39;, &#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">option</span><span class="p">,</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">option</span><span class="p">))</span>
<span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">()))</span></div>
<div class="viewcode-block" id="StandardOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.StandardOptions">[docs]</a><span class="k">class</span> <span class="nc">StandardOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="n">DEFAULT_RUNNER</span> <span class="o">=</span> <span class="s1">&#39;DirectRunner&#39;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--runner&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Pipeline runner used to execute the workflow. Valid values are &#39;</span>
<span class="s1">&#39;DirectRunner, DataflowRunner.&#39;</span><span class="p">))</span>
<span class="c1"># Whether to enable streaming mode.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--streaming&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether to enable streaming mode.&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="TypeOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TypeOptions">[docs]</a><span class="k">class</span> <span class="nc">TypeOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># TODO(laolu): Add a type inferencing option here once implemented.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--type_check_strictness&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;DEFAULT_TO_ANY&#39;</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;ALL_REQUIRED&#39;</span><span class="p">,</span> <span class="s1">&#39;DEFAULT_TO_ANY&#39;</span><span class="p">],</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The level of exhaustive manual type-hint &#39;</span>
<span class="s1">&#39;annotation required&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--no_pipeline_type_check&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;pipeline_type_check&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_false&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Disable type checking at pipeline construction &#39;</span>
<span class="s1">&#39;time&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--runtime_type_check&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable type checking at pipeline execution &#39;</span>
<span class="s1">&#39;time. NOTE: only supported with the &#39;</span>
<span class="s1">&#39;DirectRunner&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="DirectOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DirectOptions">[docs]</a><span class="k">class</span> <span class="nc">DirectOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;DirectRunner-specific execution options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--no_direct_runner_use_stacked_bundle&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_false&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;direct_runner_use_stacked_bundle&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;DirectRunner uses stacked WindowedValues within a Bundle for &#39;</span>
<span class="s1">&#39;memory optimization. Set --no_direct_runner_use_stacked_bundle to &#39;</span>
<span class="s1">&#39;avoid it.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--direct_runner_bundle_repeat&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;replay every bundle this many extra times, for profiling&#39;</span>
<span class="s1">&#39;and debugging&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--direct_num_workers&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;number of parallel running workers.&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="GoogleCloudOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions">[docs]</a><span class="k">class</span> <span class="nc">GoogleCloudOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Google Cloud Dataflow service execution options.&quot;&quot;&quot;</span>
<span class="n">BIGQUERY_API_SERVICE</span> <span class="o">=</span> <span class="s1">&#39;bigquery.googleapis.com&#39;</span>
<span class="n">COMPUTE_API_SERVICE</span> <span class="o">=</span> <span class="s1">&#39;compute.googleapis.com&#39;</span>
<span class="n">STORAGE_API_SERVICE</span> <span class="o">=</span> <span class="s1">&#39;storage.googleapis.com&#39;</span>
<span class="n">DATAFLOW_ENDPOINT</span> <span class="o">=</span> <span class="s1">&#39;https://dataflow.googleapis.com&#39;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dataflow_endpoint&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">DATAFLOW_ENDPOINT</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;The URL for the Dataflow API. If not set, the default public URL &#39;</span>
<span class="s1">&#39;will be used.&#39;</span><span class="p">))</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--project&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Name of the Cloud project owning the Dataflow &#39;</span>
<span class="s1">&#39;job.&#39;</span><span class="p">)</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--job_name&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Name of the Cloud Dataflow job.&#39;</span><span class="p">)</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--staging_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCS path for staging code packages needed by &#39;</span>
<span class="s1">&#39;workers.&#39;</span><span class="p">)</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="c1"># If staging_location is not set, it defaults to temp_location.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--temp_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCS path for saving temporary workflow jobs.&#39;</span><span class="p">)</span>
<span class="c1"># The Google Compute Engine region for creating Dataflow jobs. See</span>
<span class="c1"># https://cloud.google.com/compute/docs/regions-zones/regions-zones for a</span>
<span class="c1"># list of valid options. Currently defaults to us-central1, but future</span>
<span class="c1"># releases of Beam will require the user to set the region explicitly.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--region&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The Google Compute Engine region for creating &#39;</span>
<span class="s1">&#39;Dataflow job.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--service_account_email&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Identity to run virtual machines as.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--no_auth&#39;</span><span class="p">,</span> <span class="n">dest</span><span class="o">=</span><span class="s1">&#39;no_auth&#39;</span><span class="p">,</span> <span class="nb">type</span><span class="o">=</span><span class="nb">bool</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
<span class="c1"># Option to run templated pipelines</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--template_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Save job to specified local or GCS location.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--label&#39;</span><span class="p">,</span> <span class="s1">&#39;--labels&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;labels&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Labels to be applied to this Dataflow job. &#39;</span>
<span class="s1">&#39;Labels are key value pairs separated by = &#39;</span>
<span class="s1">&#39;(e.g. --label key=value).&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--update&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Update an existing streaming Cloud Dataflow job. &#39;</span>
<span class="s1">&#39;Experimental. &#39;</span>
<span class="s1">&#39;See https://cloud.google.com/dataflow/docs/guides/&#39;</span>
<span class="s1">&#39;updating-a-pipeline&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--transform_name_mapping&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The transform mapping that maps the named &#39;</span>
<span class="s1">&#39;transforms in your prior pipeline code to names &#39;</span>
<span class="s1">&#39;in your replacement pipeline code.&#39;</span>
<span class="s1">&#39;Experimental. &#39;</span>
<span class="s1">&#39;See https://cloud.google.com/dataflow/docs/guides/&#39;</span>
<span class="s1">&#39;updating-a-pipeline&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--enable_streaming_engine&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable Windmill Service for this Dataflow job. &#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--dataflow_kms_key&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Set a Google Cloud KMS key name to be used in &#39;</span>
<span class="s1">&#39;Dataflow state operations (GBK, Streaming).&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--flexrs_goal&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;COST_OPTIMIZED&#39;</span><span class="p">,</span> <span class="s1">&#39;SPEED_OPTIMIZED&#39;</span><span class="p">],</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Set the Flexible Resource Scheduling mode&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="GoogleCloudOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">validator</span><span class="o">.</span><span class="n">is_service_runner</span><span class="p">():</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_cloud_options</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_gcs_path</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;temp_location&#39;</span><span class="p">))</span>
<span class="k">if</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;staging_location&#39;</span><span class="p">,</span>
<span class="kc">None</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;temp_location&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_gcs_path</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;staging_location&#39;</span><span class="p">))</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">DebugOptions</span><span class="p">)</span><span class="o">.</span><span class="n">dataflow_job_file</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">template_location</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--dataflow_job_file and --template_location &#39;</span>
<span class="s1">&#39;are mutually exclusive.&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">region</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">region</span> <span class="o">=</span> <span class="s1">&#39;us-central1&#39;</span>
<span class="n">runner</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">StandardOptions</span><span class="p">)</span><span class="o">.</span><span class="n">runner</span>
<span class="k">if</span> <span class="n">runner</span> <span class="o">==</span> <span class="s1">&#39;DataflowRunner&#39;</span> <span class="ow">or</span> <span class="n">runner</span> <span class="o">==</span> <span class="s1">&#39;TestDataflowRunner&#39;</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;--region not set; will default to us-central1. Future releases of &#39;</span>
<span class="s1">&#39;Beam will require the user to set the region explicitly. &#39;</span>
<span class="s1">&#39;https://cloud.google.com/compute/docs/regions-zones/regions-zones&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<div class="viewcode-block" id="HadoopFileSystemOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.HadoopFileSystemOptions">[docs]</a><span class="k">class</span> <span class="nc">HadoopFileSystemOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;``HadoopFileSystem`` connection options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_host&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Hostname or address of the HDFS namenode.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_port&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Port of the HDFS namenode.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_user&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;HDFS username to use.&#39;</span><span class="p">))</span>
<div class="viewcode-block" id="HadoopFileSystemOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.HadoopFileSystemOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_optional_argument_positive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;port&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<span class="c1"># Command line options controlling the worker pool configuration.</span>
<span class="c1"># TODO(silviuc): Update description when autoscaling options are in.</span>
<div class="viewcode-block" id="WorkerOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.WorkerOptions">[docs]</a><span class="k">class</span> <span class="nc">WorkerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Worker pool configuration options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--num_workers&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Number of workers to use when executing the Dataflow job. If not &#39;</span>
<span class="s1">&#39;set, the Dataflow service will use a reasonable default.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--max_num_workers&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Maximum number of workers to use when executing the Dataflow job.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--autoscaling_algorithm&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;NONE&#39;</span><span class="p">,</span> <span class="s1">&#39;THROUGHPUT_BASED&#39;</span><span class="p">],</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># Meaning unset, distinct from &#39;NONE&#39; meaning don&#39;t scale</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;If and how to autoscale the workerpool.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_machine_type&#39;</span><span class="p">,</span> <span class="s1">&#39;--machine_type&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;machine_type&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Machine type to create Dataflow worker VMs as. See &#39;</span>
<span class="s1">&#39;https://cloud.google.com/compute/docs/machine-types &#39;</span>
<span class="s1">&#39;for a list of valid options. If not set, &#39;</span>
<span class="s1">&#39;the Dataflow service will choose a reasonable &#39;</span>
<span class="s1">&#39;default.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--disk_size_gb&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Remote worker disk size, in gigabytes, or 0 to use the default size. &#39;</span>
<span class="s1">&#39;If not set, the Dataflow service will use a reasonable default.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_disk_type&#39;</span><span class="p">,</span> <span class="s1">&#39;--disk_type&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;disk_type&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Specifies what type of persistent disk should be used.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--zone&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;GCE availability zone for launching workers. Default is up to the &#39;</span>
<span class="s1">&#39;Dataflow service.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--network&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;GCE network for launching workers. Default is up to the Dataflow &#39;</span>
<span class="s1">&#39;service.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--subnetwork&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;GCE subnetwork for launching workers. Default is up to the &#39;</span>
<span class="s1">&#39;Dataflow service. Expected format is &#39;</span>
<span class="s1">&#39;regions/REGION/subnetworks/SUBNETWORK or the fully qualified &#39;</span>
<span class="s1">&#39;subnetwork name. For more information, see &#39;</span>
<span class="s1">&#39;https://cloud.google.com/compute/docs/vpc/&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_harness_container_image&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Docker registry location of container image to use for the &#39;</span>
<span class="s1">&#39;worker harness. Default is the container for the version of the &#39;</span>
<span class="s1">&#39;SDK. Note: currently, only approved Google Cloud Dataflow &#39;</span>
<span class="s1">&#39;container images may be used here.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--use_public_ips&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether to assign public IP addresses to the worker VMs.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--no_use_public_ips&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;use_public_ips&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_false&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether to assign only private IP addresses to the worker VMs.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--min_cpu_platform&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;min_cpu_platform&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCE minimum CPU platform. Default is determined by GCP.&#39;</span>
<span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dataflow_worker_jar&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;dataflow_worker_jar&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Dataflow worker jar file. If specified, the jar file is staged &#39;</span>
<span class="s1">&#39;in GCS, then gets loaded by workers. End users usually &#39;</span>
<span class="s1">&#39;should not use this feature.&#39;</span>
<span class="p">)</span>
<div class="viewcode-block" id="WorkerOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.WorkerOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">validator</span><span class="o">.</span><span class="n">is_service_runner</span><span class="p">():</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span>
<span class="n">validator</span><span class="o">.</span><span class="n">validate_optional_argument_positive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;num_workers&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<div class="viewcode-block" id="DebugOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions">[docs]</a><span class="k">class</span> <span class="nc">DebugOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--dataflow_job_file&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Debug file to write the workflow specification.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--experiment&#39;</span><span class="p">,</span> <span class="s1">&#39;--experiments&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;experiments&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Runners may provide a number of experimental features that can be &#39;</span>
<span class="s1">&#39;enabled with this flag. Please sync with the owners of the runner &#39;</span>
<span class="s1">&#39;before enabling any experiments.&#39;</span><span class="p">))</span>
<div class="viewcode-block" id="DebugOptions.add_experiment"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions.add_experiment">[docs]</a> <span class="k">def</span> <span class="nf">add_experiment</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">experiment</span><span class="p">):</span>
<span class="c1"># pylint: disable=access-member-before-definition</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">experiments</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">experiment</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">experiment</span><span class="p">)</span></div>
<div class="viewcode-block" id="DebugOptions.lookup_experiment"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions.lookup_experiment">[docs]</a> <span class="k">def</span> <span class="nf">lookup_experiment</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="k">return</span> <span class="n">default</span>
<span class="k">elif</span> <span class="n">key</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">for</span> <span class="n">experiment</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">experiments</span><span class="p">:</span>
<span class="k">if</span> <span class="n">experiment</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="n">key</span> <span class="o">+</span> <span class="s1">&#39;=&#39;</span><span class="p">):</span>
<span class="k">return</span> <span class="n">experiment</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;=&#39;</span><span class="p">,</span> <span class="mi">1</span><span class="p">)[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">return</span> <span class="n">default</span></div></div>
<div class="viewcode-block" id="ProfilingOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.ProfilingOptions">[docs]</a><span class="k">class</span> <span class="nc">ProfilingOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--profile_cpu&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable work item CPU profiling.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--profile_memory&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable work item heap profiling.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--profile_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;path for saving profiler data.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--profile_sample_rate&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">float</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="mf">1.0</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;A number between 0 and 1 indicating the ratio &#39;</span>
<span class="s1">&#39;of bundles that should be profiled.&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="SetupOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.SetupOptions">[docs]</a><span class="k">class</span> <span class="nc">SetupOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># Options for installing dependencies in the worker.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--requirements_file&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Path to a requirements file containing package dependencies. &#39;</span>
<span class="s1">&#39;Typically it is produced by a pip freeze command. More details: &#39;</span>
<span class="s1">&#39;https://pip.pypa.io/en/latest/reference/pip_freeze.html. &#39;</span>
<span class="s1">&#39;If used, all the packages specified will be downloaded, &#39;</span>
<span class="s1">&#39;cached (use --requirements_cache to change default location), &#39;</span>
<span class="s1">&#39;and then staged so that they can be automatically installed in &#39;</span>
<span class="s1">&#39;workers during startup. The cache is refreshed as needed &#39;</span>
<span class="s1">&#39;avoiding extra downloads for existing packages. Typically the &#39;</span>
<span class="s1">&#39;file is named requirements.txt.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--requirements_cache&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Path to a folder to cache the packages specified in &#39;</span>
<span class="s1">&#39;the requirements file using the --requirements_file option.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--setup_file&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Path to a setup Python file containing package dependencies. If &#39;</span>
<span class="s1">&#39;specified, the file</span><span class="se">\&#39;</span><span class="s1">s containing folder is assumed to have the &#39;</span>
<span class="s1">&#39;structure required for a setuptools setup package. The file must be &#39;</span>
<span class="s1">&#39;named setup.py. More details: &#39;</span>
<span class="s1">&#39;https://pythonhosted.org/an_example_pypi_project/setuptools.html &#39;</span>
<span class="s1">&#39;During job submission a source distribution will be built and the &#39;</span>
<span class="s1">&#39;worker will install the resulting package before running any custom &#39;</span>
<span class="s1">&#39;code.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--beam_plugin&#39;</span><span class="p">,</span> <span class="s1">&#39;--beam_plugin&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;beam_plugins&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Bootstrap the python process before executing any code by importing &#39;</span>
<span class="s1">&#39;all the plugins used in the pipeline. Please pass a comma separated&#39;</span>
<span class="s1">&#39;list of import paths to be included. This is currently an &#39;</span>
<span class="s1">&#39;experimental flag and provides no stability. Multiple &#39;</span>
<span class="s1">&#39;--beam_plugin options can be specified if more than one plugin &#39;</span>
<span class="s1">&#39;is needed.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--save_main_session&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Save the main session state so that pickled functions and classes &#39;</span>
<span class="s1">&#39;defined in __main__ (e.g. interactive session) can be unpickled. &#39;</span>
<span class="s1">&#39;Some workflows do not need the session state if for instance all &#39;</span>
<span class="s1">&#39;their functions/classes are defined in proper modules (not __main__)&#39;</span>
<span class="s1">&#39; and the modules are importable in the worker. &#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--sdk_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;default&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Override the default location from where the Beam SDK is downloaded. &#39;</span>
<span class="s1">&#39;It can be a URL, a GCS path, or a local path to an SDK tarball. &#39;</span>
<span class="s1">&#39;Workflow submissions will download or copy an SDK tarball from here. &#39;</span>
<span class="s1">&#39;If set to the string &quot;default&quot;, a standard SDK location is used. If &#39;</span>
<span class="s1">&#39;empty, no SDK is copied.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--extra_package&#39;</span><span class="p">,</span> <span class="s1">&#39;--extra_packages&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;extra_packages&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Local path to a Python package file. The file is expected to be (1) &#39;</span>
<span class="s1">&#39;a package tarball (&quot;.tar&quot;), (2) a compressed package tarball &#39;</span>
<span class="s1">&#39;(&quot;.tar.gz&quot;), (3) a Wheel file (&quot;.whl&quot;) or (4) a compressed package &#39;</span>
<span class="s1">&#39;zip file (&quot;.zip&quot;) which can be installed using the &quot;pip install&quot; &#39;</span>
<span class="s1">&#39;command of the standard pip package. Multiple --extra_package &#39;</span>
<span class="s1">&#39;options can be specified if more than one package is needed. During &#39;</span>
<span class="s1">&#39;job submission, the files will be staged in the staging area &#39;</span>
<span class="s1">&#39;(--staging_location option) and the workers will install them in &#39;</span>
<span class="s1">&#39;same order they were specified on the command line.&#39;</span><span class="p">))</span></div>
<span class="k">class</span> <span class="nc">PortableOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Portable options are common options expected to be understood by most of</span>
<span class="sd"> the portable runners.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--job_endpoint&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Job service endpoint to use. Should be in the form &#39;</span>
<span class="s1">&#39;of address and port, e.g. localhost:3000&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--environment_type&#39;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Set the default environment type for running &#39;</span>
<span class="s1">&#39;user code. Possible options are DOCKER and PROCESS.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--environment_config&#39;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Set environment configuration for running the user code.</span><span class="se">\n</span><span class="s1"> For &#39;</span>
<span class="s1">&#39;DOCKER: Url for the docker image.</span><span class="se">\n</span><span class="s1"> For PROCESS: json of the &#39;</span>
<span class="s1">&#39;form {&quot;os&quot;: &quot;&lt;OS&gt;&quot;, &quot;arch&quot;: &quot;&lt;ARCHITECTURE&gt;&quot;, &quot;command&quot;: &#39;</span>
<span class="s1">&#39;&quot;&lt;process to execute&gt;&quot;, &quot;env&quot;:{&quot;&lt;Environment variables 1&gt;&quot;: &#39;</span>
<span class="s1">&#39;&quot;&lt;ENV_VAL&gt;&quot;} }. All fields in the json are optional except &#39;</span>
<span class="s1">&#39;command.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--sdk_worker_parallelism&#39;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Sets the number of sdk worker processes that will run on each &#39;</span>
<span class="s1">&#39;worker node. Default is 0. If 0, it will be automatically set &#39;</span>
<span class="s1">&#39;by the runner by looking at different parameters (e.g. number &#39;</span>
<span class="s1">&#39;of CPU cores on the worker machine or configuration).&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--environment_cache_millis&#39;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Duration in milliseconds for environment cache within a job. &#39;</span>
<span class="s1">&#39;0 means no caching.&#39;</span><span class="p">))</span>
<div class="viewcode-block" id="TestOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TestOptions">[docs]</a><span class="k">class</span> <span class="nc">TestOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># Options for e2e test pipeline.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--on_success_matcher&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Verify state/output of e2e test pipeline. This is pickled &#39;</span>
<span class="s1">&#39;version of the matcher which should extends &#39;</span>
<span class="s1">&#39;hamcrest.core.base_matcher.BaseMatcher.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dry_run&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Used in unit testing runners without submitting the &#39;</span>
<span class="s1">&#39;actual job.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--wait_until_finish_duration&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The time to wait (in milliseconds) for test pipeline to finish. &#39;</span>
<span class="s1">&#39;If it is set to None, it will wait indefinitely until the job &#39;</span>
<span class="s1">&#39;is finished.&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="TestOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TestOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">TestOptions</span><span class="p">)</span><span class="o">.</span><span class="n">on_success_matcher</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_test_matcher</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;on_success_matcher&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<span class="k">class</span> <span class="nc">TestDataflowOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># This option is passed to Dataflow Runner&#39;s Pub/Sub client. The camelCase</span>
<span class="c1"># style in &#39;dest&#39; matches the runner&#39;s.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--pubsub_root_url&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;pubsubRootUrl&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Root URL for use with the Google Cloud Pub/Sub API.&#39;</span><span class="p">,)</span>
<span class="c1"># TODO(silviuc): Add --files_to_stage option.</span>
<span class="c1"># This could potentially replace the --requirements_file and --setup_file.</span>
<span class="c1"># TODO(silviuc): Non-standard options. Keep them? If yes, add help too!</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="k">class</span> <span class="nc">OptionsContext</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Set default pipeline options for pipelines created in this block.</span>
<span class="sd"> This is particularly useful for pipelines implicitly created with the</span>
<span class="sd"> [python list] | PTransform</span>
<span class="sd"> construct.</span>
<span class="sd"> Can also be used as a decorator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">overrides</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">options</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span>
<span class="k">def</span> <span class="nf">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">overrides</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">exn_info</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">overrides</span><span class="o">.</span><span class="n">pop</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">wrapper</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">with</span> <span class="bp">self</span><span class="p">:</span>
<span class="n">f</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">wrapper</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">augment_options</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span>
<span class="k">for</span> <span class="n">override</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="n">overrides</span><span class="p">:</span>
<span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">override</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">options</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">return</span> <span class="n">options</span>
</pre></div>
</div>
<div class="articleComments">
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright .
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT:'../../../',
VERSION:'',
COLLAPSE_INDEX:false,
FILE_SUFFIX:'.html',
HAS_SOURCE: true,
SOURCELINK_SUFFIX: '.txt'
};
</script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.StickyNav.enable();
});
</script>
</body>
</html>