blob: 4d32008a6a3e4b0b6e67eb63d85b02ec7f0ba7b8 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.options.pipeline_options &mdash; Apache Beam documentation</title>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="index" title="Index"
href="../../../genindex.html"/>
<link rel="search" title="Search" href="../../../search.html"/>
<link rel="top" title="Apache Beam documentation" href="../../../index.html"/>
<link rel="up" title="Module code" href="../../index.html"/>
<script src="../../../_static/js/modernizr.min.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search">
<a href="../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.internal.html">apache_beam.internal package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.tools.html">apache_beam.tools package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.version.html">apache_beam.version module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.options.pipeline_options</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.options.pipeline_options</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Pipeline options obtained from command line parsing.&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
<span class="kn">import</span> <span class="nn">argparse</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">list</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">object</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="k">import</span> <span class="n">RuntimeValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="k">import</span> <span class="n">StaticValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.options.value_provider</span> <span class="k">import</span> <span class="n">ValueProvider</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms.display</span> <span class="k">import</span> <span class="n">HasDisplayData</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;PipelineOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;StandardOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;TypeOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;DirectOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;GoogleCloudOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;HadoopFileSystemOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;WorkerOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;DebugOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;ProfilingOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;SetupOptions&#39;</span><span class="p">,</span>
<span class="s1">&#39;TestOptions&#39;</span><span class="p">,</span>
<span class="p">]</span>
<span class="k">def</span> <span class="nf">_static_value_provider_of</span><span class="p">(</span><span class="n">value_type</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;&quot;Helper function to plug a ValueProvider into argparse.</span>
<span class="sd"> Args:</span>
<span class="sd"> value_type: the type of the value. Since the type param of argparse&#39;s</span>
<span class="sd"> add_argument will always be ValueProvider, we need to</span>
<span class="sd"> preserve the type of the actual value.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A partially constructed StaticValueProvider in the form of a function.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">_f</span><span class="p">(</span><span class="n">value</span><span class="p">):</span>
<span class="n">_f</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">=</span> <span class="n">value_type</span><span class="o">.</span><span class="vm">__name__</span>
<span class="k">return</span> <span class="n">StaticValueProvider</span><span class="p">(</span><span class="n">value_type</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">return</span> <span class="n">_f</span>
<span class="k">class</span> <span class="nc">_BeamArgumentParser</span><span class="p">(</span><span class="n">argparse</span><span class="o">.</span><span class="n">ArgumentParser</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;An ArgumentParser that supports ValueProvider options.</span>
<span class="sd"> Example Usage::</span>
<span class="sd"> class TemplateUserOptions(PipelineOptions):</span>
<span class="sd"> @classmethod</span>
<span class="sd"> def _add_argparse_args(cls, parser):</span>
<span class="sd"> parser.add_value_provider_argument(&#39;--vp-arg1&#39;, default=&#39;start&#39;)</span>
<span class="sd"> parser.add_value_provider_argument(&#39;--vp-arg2&#39;)</span>
<span class="sd"> parser.add_argument(&#39;--non-vp-arg&#39;)</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">add_value_provider_argument</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;ValueProvider arguments can be either of type keyword or positional.</span>
<span class="sd"> At runtime, even positional arguments will need to be supplied in the</span>
<span class="sd"> key/value form.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Extract the option name from positional argument [&#39;pos_arg&#39;]</span>
<span class="k">assert</span> <span class="n">args</span> <span class="o">!=</span> <span class="p">()</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> <span class="o">&gt;=</span> <span class="mi">1</span>
<span class="k">if</span> <span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="s1">&#39;-&#39;</span><span class="p">:</span>
<span class="n">option_name</span> <span class="o">=</span> <span class="n">args</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">if</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;nargs&#39;</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> <span class="c1"># make them optionally templated</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;nargs&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="s1">&#39;?&#39;</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]</span>
<span class="n">option_name</span> <span class="o">=</span> <span class="p">[</span><span class="n">i</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;--&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="n">args</span> <span class="k">if</span> <span class="n">i</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;--&#39;</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span>
<span class="c1"># reassign the type to make room for using</span>
<span class="c1"># StaticValueProvider as the type for add_argument</span>
<span class="n">value_type</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;type&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">str</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">_static_value_provider_of</span><span class="p">(</span><span class="n">value_type</span><span class="p">)</span>
<span class="c1"># reassign default to default_value to make room for using</span>
<span class="c1"># RuntimeValueProvider as the default for add_argument</span>
<span class="n">default_value</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;default&#39;</span><span class="p">)</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;default&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">RuntimeValueProvider</span><span class="p">(</span>
<span class="n">option_name</span><span class="o">=</span><span class="n">option_name</span><span class="p">,</span>
<span class="n">value_type</span><span class="o">=</span><span class="n">value_type</span><span class="p">,</span>
<span class="n">default_value</span><span class="o">=</span><span class="n">default_value</span>
<span class="p">)</span>
<span class="c1"># have add_argument do most of the work</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<div class="viewcode-block" id="PipelineOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions">[docs]</a><span class="k">class</span> <span class="nc">PipelineOptions</span><span class="p">(</span><span class="n">HasDisplayData</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Pipeline options class used as container for command line options.</span>
<span class="sd"> The class is essentially a wrapper over the standard argparse Python module</span>
<span class="sd"> (see https://docs.python.org/3/library/argparse.html). To define one option</span>
<span class="sd"> or a group of options you subclass from PipelineOptions::</span>
<span class="sd"> class XyzOptions(PipelineOptions):</span>
<span class="sd"> @classmethod</span>
<span class="sd"> def _add_argparse_args(cls, parser):</span>
<span class="sd"> parser.add_argument(&#39;--abc&#39;, default=&#39;start&#39;)</span>
<span class="sd"> parser.add_argument(&#39;--xyz&#39;, default=&#39;end&#39;)</span>
<span class="sd"> The arguments for the add_argument() method are exactly the ones</span>
<span class="sd"> described in the argparse public documentation.</span>
<span class="sd"> Pipeline objects require an options object during initialization.</span>
<span class="sd"> This is obtained simply by initializing an options class as defined above::</span>
<span class="sd"> p = Pipeline(options=XyzOptions())</span>
<span class="sd"> if p.options.xyz == &#39;end&#39;:</span>
<span class="sd"> raise ValueError(&#39;Option xyz has an invalid value.&#39;)</span>
<span class="sd"> By default the options classes will use command line arguments to initialize</span>
<span class="sd"> the options.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">flags</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initialize an options class.</span>
<span class="sd"> The initializer will traverse all subclasses, add all their argparse</span>
<span class="sd"> arguments and then parse the command line specified by flags or by default</span>
<span class="sd"> the one obtained from sys.argv.</span>
<span class="sd"> The subclasses are not expected to require a redefinition of __init__.</span>
<span class="sd"> Args:</span>
<span class="sd"> flags: An iterable of command line arguments to be used. If not specified</span>
<span class="sd"> then sys.argv will be used as input for parsing arguments.</span>
<span class="sd"> **kwargs: Add overrides for arguments passed in flags.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_flags</span> <span class="o">=</span> <span class="n">flags</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="n">parser</span> <span class="o">=</span> <span class="n">_BeamArgumentParser</span><span class="p">()</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">mro</span><span class="p">():</span>
<span class="k">if</span> <span class="bp">cls</span> <span class="o">==</span> <span class="n">PipelineOptions</span><span class="p">:</span>
<span class="k">break</span>
<span class="k">elif</span> <span class="s1">&#39;_add_argparse_args&#39;</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">:</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span>
<span class="c1"># The _visible_options attribute will contain only those options from the</span>
<span class="c1"># flags (i.e., command line) that can be recognized. The _all_options</span>
<span class="c1"># field contains additional overrides.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="n">flags</span><span class="p">)</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># Override this in subclasses to provide options.</span>
<span class="k">pass</span>
<div class="viewcode-block" id="PipelineOptions.from_dictionary"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.from_dictionary">[docs]</a> <span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">from_dictionary</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a PipelineOptions from a dictionary of arguments.</span>
<span class="sd"> Args:</span>
<span class="sd"> options: Dictionary of argument value pairs.</span>
<span class="sd"> Returns:</span>
<span class="sd"> A PipelineOptions object representing the given arguments.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">flags</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">options</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">bool</span><span class="p">):</span>
<span class="k">if</span> <span class="n">v</span><span class="p">:</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="n">k</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="n">v</span><span class="p">:</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">i</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">flags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">))</span>
<span class="k">return</span> <span class="bp">cls</span><span class="p">(</span><span class="n">flags</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineOptions.get_all_options"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.get_all_options">[docs]</a> <span class="k">def</span> <span class="nf">get_all_options</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">drop_default</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Returns a dictionary of all defined arguments.</span>
<span class="sd"> Returns a dictionary of all defined arguments (arguments that are defined in</span>
<span class="sd"> any subclass of PipelineOptions) into a dictionary.</span>
<span class="sd"> Args:</span>
<span class="sd"> drop_default: If set to true, options that are equal to their default</span>
<span class="sd"> values, are not returned as part of the result dictionary.</span>
<span class="sd"> Returns:</span>
<span class="sd"> Dictionary of all args and values.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># TODO(BEAM-1319): PipelineOption sub-classes in the main session might be</span>
<span class="c1"># repeated. Pick last unique instance of each subclass to avoid conflicts.</span>
<span class="n">subset</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">parser</span> <span class="o">=</span> <span class="n">_BeamArgumentParser</span><span class="p">()</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">PipelineOptions</span><span class="o">.</span><span class="n">__subclasses__</span><span class="p">():</span>
<span class="n">subset</span><span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="bp">cls</span><span class="p">)]</span> <span class="o">=</span> <span class="bp">cls</span>
<span class="k">for</span> <span class="bp">cls</span> <span class="ow">in</span> <span class="n">subset</span><span class="o">.</span><span class="n">values</span><span class="p">():</span>
<span class="bp">cls</span><span class="o">.</span><span class="n">_add_argparse_args</span><span class="p">(</span><span class="n">parser</span><span class="p">)</span> <span class="c1"># pylint: disable=protected-access</span>
<span class="n">known_args</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span>
<span class="n">result</span> <span class="o">=</span> <span class="nb">vars</span><span class="p">(</span><span class="n">known_args</span><span class="p">)</span>
<span class="c1"># Apply the overrides if any</span>
<span class="k">for</span> <span class="n">k</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">result</span><span class="p">):</span>
<span class="k">if</span> <span class="n">k</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">:</span>
<span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">k</span><span class="p">]</span>
<span class="k">if</span> <span class="p">(</span><span class="n">drop_default</span> <span class="ow">and</span>
<span class="n">parser</span><span class="o">.</span><span class="n">get_default</span><span class="p">(</span><span class="n">k</span><span class="p">)</span> <span class="o">==</span> <span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span> <span class="ow">and</span>
<span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">parser</span><span class="o">.</span><span class="n">get_default</span><span class="p">(</span><span class="n">k</span><span class="p">),</span> <span class="n">ValueProvider</span><span class="p">)):</span>
<span class="k">del</span> <span class="n">result</span><span class="p">[</span><span class="n">k</span><span class="p">]</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="PipelineOptions.display_data"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.display_data">[docs]</a> <span class="k">def</span> <span class="nf">display_data</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_all_options</span><span class="p">(</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="PipelineOptions.view_as"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.view_as">[docs]</a> <span class="k">def</span> <span class="nf">view_as</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="bp">cls</span><span class="p">):</span>
<span class="n">view</span> <span class="o">=</span> <span class="bp">cls</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_flags</span><span class="p">)</span>
<span class="n">view</span><span class="o">.</span><span class="n">_all_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span>
<span class="k">return</span> <span class="n">view</span></div>
<span class="k">def</span> <span class="nf">_visible_option_list</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">option</span>
<span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="nb">dir</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">)</span> <span class="k">if</span> <span class="n">option</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="s1">&#39;_&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__dir__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">sorted</span><span class="p">(</span><span class="nb">dir</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> <span class="o">+</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">)</span> <span class="o">+</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">):</span>
<span class="c1"># Special methods which may be accessed before the object is</span>
<span class="c1"># fully constructed (e.g. in unpickling).</span>
<span class="k">if</span> <span class="n">name</span><span class="p">[:</span><span class="mi">2</span><span class="p">]</span> <span class="o">==</span> <span class="n">name</span><span class="p">[</span><span class="o">-</span><span class="mi">2</span><span class="p">:]</span> <span class="o">==</span> <span class="s1">&#39;__&#39;</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_visible_options</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span><span class="s2">&quot;&#39;</span><span class="si">%s</span><span class="s2">&#39; object has no attribute &#39;</span><span class="si">%s</span><span class="s2">&#39;&quot;</span> <span class="o">%</span>
<span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;_flags&#39;</span><span class="p">,</span> <span class="s1">&#39;_all_options&#39;</span><span class="p">,</span> <span class="s1">&#39;_visible_options&#39;</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__setattr__</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">():</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_all_options</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span><span class="s2">&quot;&#39;</span><span class="si">%s</span><span class="s2">&#39; object has no attribute &#39;</span><span class="si">%s</span><span class="s2">&#39;&quot;</span> <span class="o">%</span>
<span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">name</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">__str__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s1">&#39;</span><span class="si">%s</span><span class="s1">(</span><span class="si">%s</span><span class="s1">)&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span>
<span class="s1">&#39;, &#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">%s</span><span class="s1">=</span><span class="si">%s</span><span class="s1">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">option</span><span class="p">,</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">option</span><span class="p">))</span>
<span class="k">for</span> <span class="n">option</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_visible_option_list</span><span class="p">()))</span></div>
<div class="viewcode-block" id="StandardOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.StandardOptions">[docs]</a><span class="k">class</span> <span class="nc">StandardOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="n">DEFAULT_RUNNER</span> <span class="o">=</span> <span class="s1">&#39;DirectRunner&#39;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--runner&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Pipeline runner used to execute the workflow. Valid values are &#39;</span>
<span class="s1">&#39;DirectRunner, DataflowRunner.&#39;</span><span class="p">))</span>
<span class="c1"># Whether to enable streaming mode.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--streaming&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether to enable streaming mode.&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="TypeOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TypeOptions">[docs]</a><span class="k">class</span> <span class="nc">TypeOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># TODO(laolu): Add a type inferencing option here once implemented.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--type_check_strictness&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;DEFAULT_TO_ANY&#39;</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;ALL_REQUIRED&#39;</span><span class="p">,</span> <span class="s1">&#39;DEFAULT_TO_ANY&#39;</span><span class="p">],</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The level of exhaustive manual type-hint &#39;</span>
<span class="s1">&#39;annotation required&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--no_pipeline_type_check&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;pipeline_type_check&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_false&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Disable type checking at pipeline construction &#39;</span>
<span class="s1">&#39;time&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--runtime_type_check&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable type checking at pipeline execution &#39;</span>
<span class="s1">&#39;time. NOTE: only supported with the &#39;</span>
<span class="s1">&#39;DirectRunner&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="DirectOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DirectOptions">[docs]</a><span class="k">class</span> <span class="nc">DirectOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;DirectRunner-specific execution options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--no_direct_runner_use_stacked_bundle&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_false&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;direct_runner_use_stacked_bundle&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;DirectRunner uses stacked WindowedValues within a Bundle for &#39;</span>
<span class="s1">&#39;memory optimization. Set --no_direct_runner_use_stacked_bundle to &#39;</span>
<span class="s1">&#39;avoid it.&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="GoogleCloudOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions">[docs]</a><span class="k">class</span> <span class="nc">GoogleCloudOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Google Cloud Dataflow service execution options.&quot;&quot;&quot;</span>
<span class="n">BIGQUERY_API_SERVICE</span> <span class="o">=</span> <span class="s1">&#39;bigquery.googleapis.com&#39;</span>
<span class="n">COMPUTE_API_SERVICE</span> <span class="o">=</span> <span class="s1">&#39;compute.googleapis.com&#39;</span>
<span class="n">STORAGE_API_SERVICE</span> <span class="o">=</span> <span class="s1">&#39;storage.googleapis.com&#39;</span>
<span class="n">DATAFLOW_ENDPOINT</span> <span class="o">=</span> <span class="s1">&#39;https://dataflow.googleapis.com&#39;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dataflow_endpoint&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="bp">cls</span><span class="o">.</span><span class="n">DATAFLOW_ENDPOINT</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;The URL for the Dataflow API. If not set, the default public URL &#39;</span>
<span class="s1">&#39;will be used.&#39;</span><span class="p">))</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--project&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Name of the Cloud project owning the Dataflow &#39;</span>
<span class="s1">&#39;job.&#39;</span><span class="p">)</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--job_name&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Name of the Cloud Dataflow job.&#39;</span><span class="p">)</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--staging_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCS path for staging code packages needed by &#39;</span>
<span class="s1">&#39;workers.&#39;</span><span class="p">)</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="c1"># If staging_location is not set, it defaults to temp_location.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--temp_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCS path for saving temporary workflow jobs.&#39;</span><span class="p">)</span>
<span class="c1"># The Cloud Dataflow service does not yet honor this setting. However, once</span>
<span class="c1"># service support is added then users of this SDK will be able to control</span>
<span class="c1"># the region. Default is up to the Dataflow service. See</span>
<span class="c1"># https://cloud.google.com/compute/docs/regions-zones/regions-zones for a</span>
<span class="c1"># list of valid options/</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--region&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;us-central1&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The Google Compute Engine region for creating &#39;</span>
<span class="s1">&#39;Dataflow job.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--service_account_email&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Identity to run virtual machines as.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--no_auth&#39;</span><span class="p">,</span> <span class="n">dest</span><span class="o">=</span><span class="s1">&#39;no_auth&#39;</span><span class="p">,</span> <span class="nb">type</span><span class="o">=</span><span class="nb">bool</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
<span class="c1"># Option to run templated pipelines</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--template_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Save job to specified local or GCS location.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--label&#39;</span><span class="p">,</span> <span class="s1">&#39;--labels&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;labels&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Labels that will be applied to this Dataflow job. Labels are key &#39;</span>
<span class="s1">&#39;value pairs separated by = (e.g. --label key=value).&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="GoogleCloudOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">validator</span><span class="o">.</span><span class="n">is_service_runner</span><span class="p">():</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_cloud_options</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_gcs_path</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;temp_location&#39;</span><span class="p">))</span>
<span class="k">if</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;staging_location&#39;</span><span class="p">,</span>
<span class="kc">None</span><span class="p">)</span> <span class="ow">or</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;temp_location&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_gcs_path</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;staging_location&#39;</span><span class="p">))</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">DebugOptions</span><span class="p">)</span><span class="o">.</span><span class="n">dataflow_job_file</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">GoogleCloudOptions</span><span class="p">)</span><span class="o">.</span><span class="n">template_location</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--dataflow_job_file and --template_location &#39;</span>
<span class="s1">&#39;are mutually exclusive.&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<div class="viewcode-block" id="HadoopFileSystemOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.HadoopFileSystemOptions">[docs]</a><span class="k">class</span> <span class="nc">HadoopFileSystemOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;``HadoopFileSystem`` connection options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_host&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Hostname or address of the HDFS namenode.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_port&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Port of the HDFS namenode.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--hdfs_user&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;HDFS username to use.&#39;</span><span class="p">))</span>
<div class="viewcode-block" id="HadoopFileSystemOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.HadoopFileSystemOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_optional_argument_positive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;port&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<span class="c1"># Command line options controlling the worker pool configuration.</span>
<span class="c1"># TODO(silviuc): Update description when autoscaling options are in.</span>
<div class="viewcode-block" id="WorkerOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.WorkerOptions">[docs]</a><span class="k">class</span> <span class="nc">WorkerOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Worker pool configuration options.&quot;&quot;&quot;</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--num_workers&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Number of workers to use when executing the Dataflow job. If not &#39;</span>
<span class="s1">&#39;set, the Dataflow service will use a reasonable default.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--max_num_workers&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Maximum number of workers to use when executing the Dataflow job.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--autoscaling_algorithm&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span>
<span class="n">choices</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;NONE&#39;</span><span class="p">,</span> <span class="s1">&#39;THROUGHPUT_BASED&#39;</span><span class="p">],</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># Meaning unset, distinct from &#39;NONE&#39; meaning don&#39;t scale</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;If and how to autoscale the workerpool.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_machine_type&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;machine_type&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Machine type to create Dataflow worker VMs as. See &#39;</span>
<span class="s1">&#39;https://cloud.google.com/compute/docs/machine-types &#39;</span>
<span class="s1">&#39;for a list of valid options. If not set, &#39;</span>
<span class="s1">&#39;the Dataflow service will choose a reasonable &#39;</span>
<span class="s1">&#39;default.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--disk_size_gb&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Remote worker disk size, in gigabytes, or 0 to use the default size. &#39;</span>
<span class="s1">&#39;If not set, the Dataflow service will use a reasonable default.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_disk_type&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;disk_type&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Specifies what type of persistent disk should be used.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--zone&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;GCE availability zone for launching workers. Default is up to the &#39;</span>
<span class="s1">&#39;Dataflow service.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--network&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;GCE network for launching workers. Default is up to the Dataflow &#39;</span>
<span class="s1">&#39;service.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--subnetwork&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span>
<span class="s1">&#39;GCE subnetwork for launching workers. Default is up to the &#39;</span>
<span class="s1">&#39;Dataflow service. Expected format is &#39;</span>
<span class="s1">&#39;regions/REGION/subnetworks/SUBNETWORK or the fully qualified &#39;</span>
<span class="s1">&#39;subnetwork name. For more information, see &#39;</span>
<span class="s1">&#39;https://cloud.google.com/compute/docs/vpc/&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--worker_harness_container_image&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Docker registry location of container image to use for the &#39;</span>
<span class="s1">&#39;worker harness. Default is the container for the version of the &#39;</span>
<span class="s1">&#39;SDK. Note: currently, only approved Google Cloud Dataflow &#39;</span>
<span class="s1">&#39;container images may be used here.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--use_public_ips&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether to assign public IP addresses to the worker VMs.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--no_use_public_ips&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;use_public_ips&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_false&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Whether to assign only private IP addresses to the worker VMs.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--min_cpu_platform&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;min_cpu_platform&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCE minimum CPU platform. Default is determined by GCP.&#39;</span>
<span class="p">)</span>
<div class="viewcode-block" id="WorkerOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.WorkerOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">validator</span><span class="o">.</span><span class="n">is_service_runner</span><span class="p">():</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span>
<span class="n">validator</span><span class="o">.</span><span class="n">validate_optional_argument_positive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;num_workers&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<div class="viewcode-block" id="DebugOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DebugOptions">[docs]</a><span class="k">class</span> <span class="nc">DebugOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--dataflow_job_file&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Debug file to write the workflow specification.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--experiment&#39;</span><span class="p">,</span> <span class="s1">&#39;--experiments&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;experiments&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Runners may provide a number of experimental features that can be &#39;</span>
<span class="s1">&#39;enabled with this flag. Please sync with the owners of the runner &#39;</span>
<span class="s1">&#39;before enabling any experiments.&#39;</span><span class="p">))</span></div>
<div class="viewcode-block" id="ProfilingOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.ProfilingOptions">[docs]</a><span class="k">class</span> <span class="nc">ProfilingOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--profile_cpu&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable work item CPU profiling.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--profile_memory&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Enable work item heap profiling.&#39;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--profile_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;GCS path for saving profiler data.&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="SetupOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.SetupOptions">[docs]</a><span class="k">class</span> <span class="nc">SetupOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># Options for installing dependencies in the worker.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--requirements_file&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Path to a requirements file containing package dependencies. &#39;</span>
<span class="s1">&#39;Typically it is produced by a pip freeze command. More details: &#39;</span>
<span class="s1">&#39;https://pip.pypa.io/en/latest/reference/pip_freeze.html. &#39;</span>
<span class="s1">&#39;If used, all the packages specified will be downloaded, &#39;</span>
<span class="s1">&#39;cached (use --requirements_cache to change default location), &#39;</span>
<span class="s1">&#39;and then staged so that they can be automatically installed in &#39;</span>
<span class="s1">&#39;workers during startup. The cache is refreshed as needed &#39;</span>
<span class="s1">&#39;avoiding extra downloads for existing packages. Typically the &#39;</span>
<span class="s1">&#39;file is named requirements.txt.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--requirements_cache&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Path to a folder to cache the packages specified in &#39;</span>
<span class="s1">&#39;the requirements file using the --requirements_file option.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--setup_file&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Path to a setup Python file containing package dependencies. If &#39;</span>
<span class="s1">&#39;specified, the file</span><span class="se">\&#39;</span><span class="s1">s containing folder is assumed to have the &#39;</span>
<span class="s1">&#39;structure required for a setuptools setup package. The file must be &#39;</span>
<span class="s1">&#39;named setup.py. More details: &#39;</span>
<span class="s1">&#39;https://pythonhosted.org/an_example_pypi_project/setuptools.html &#39;</span>
<span class="s1">&#39;During job submission a source distribution will be built and the &#39;</span>
<span class="s1">&#39;worker will install the resulting package before running any custom &#39;</span>
<span class="s1">&#39;code.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--beam_plugin&#39;</span><span class="p">,</span> <span class="s1">&#39;--beam_plugin&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;beam_plugins&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Bootstrap the python process before executing any code by importing &#39;</span>
<span class="s1">&#39;all the plugins used in the pipeline. Please pass a comma separated&#39;</span>
<span class="s1">&#39;list of import paths to be included. This is currently an &#39;</span>
<span class="s1">&#39;experimental flag and provides no stability. Multiple &#39;</span>
<span class="s1">&#39;--beam_plugin options can be specified if more than one plugin &#39;</span>
<span class="s1">&#39;is needed.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--save_main_session&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;store_true&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Save the main session state so that pickled functions and classes &#39;</span>
<span class="s1">&#39;defined in __main__ (e.g. interactive session) can be unpickled. &#39;</span>
<span class="s1">&#39;Some workflows do not need the session state if for instance all &#39;</span>
<span class="s1">&#39;their functions/classes are defined in proper modules (not __main__)&#39;</span>
<span class="s1">&#39; and the modules are importable in the worker. &#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--sdk_location&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="s1">&#39;default&#39;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Override the default location from where the Beam SDK is downloaded. &#39;</span>
<span class="s1">&#39;It can be a URL, a GCS path, or a local path to an SDK tarball. &#39;</span>
<span class="s1">&#39;Workflow submissions will download or copy an SDK tarball from here. &#39;</span>
<span class="s1">&#39;If set to the string &quot;default&quot;, a standard SDK location is used. If &#39;</span>
<span class="s1">&#39;empty, no SDK is copied.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--extra_package&#39;</span><span class="p">,</span> <span class="s1">&#39;--extra_packages&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;extra_packages&#39;</span><span class="p">,</span>
<span class="n">action</span><span class="o">=</span><span class="s1">&#39;append&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Local path to a Python package file. The file is expected to be (1) &#39;</span>
<span class="s1">&#39;a package tarball (&quot;.tar&quot;), (2) a compressed package tarball &#39;</span>
<span class="s1">&#39;(&quot;.tar.gz&quot;), (3) a Wheel file (&quot;.whl&quot;) or (4) a compressed package &#39;</span>
<span class="s1">&#39;zip file (&quot;.zip&quot;) which can be installed using the &quot;pip install&quot; &#39;</span>
<span class="s1">&#39;command of the standard pip package. Multiple --extra_package &#39;</span>
<span class="s1">&#39;options can be specified if more than one package is needed. During &#39;</span>
<span class="s1">&#39;job submission, the files will be staged in the staging area &#39;</span>
<span class="s1">&#39;(--staging_location option) and the workers will install them in &#39;</span>
<span class="s1">&#39;same order they were specified on the command line.&#39;</span><span class="p">))</span></div>
<span class="k">class</span> <span class="nc">PortableOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--job_endpoint&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Job service endpoint to use. Should be in the form &#39;</span>
<span class="s1">&#39;of address and port, e.g. localhost:3000&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--environment_type&#39;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Set the default environment type for running &#39;</span>
<span class="s1">&#39;user code. Possible options are DOCKER and PROCESS.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--environment_config&#39;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Set environment configuration for running the user code.</span><span class="se">\n</span><span class="s1"> For &#39;</span>
<span class="s1">&#39;DOCKER: Url for the docker image.</span><span class="se">\n</span><span class="s1"> For PROCESS: json of the &#39;</span>
<span class="s1">&#39;form {&quot;os&quot;: &quot;&lt;OS&gt;&quot;, &quot;arch&quot;: &quot;&lt;ARCHITECTURE&gt;&quot;, &quot;command&quot;: &#39;</span>
<span class="s1">&#39;&quot;&lt;process to execute&gt;&quot;, &quot;env&quot;:{&quot;&lt;Environment variables 1&gt;&quot;: &#39;</span>
<span class="s1">&#39;&quot;&lt;ENV_VAL&gt;&quot;} }. All fields in the json are optional except &#39;</span>
<span class="s1">&#39;command.&#39;</span><span class="p">))</span>
<span class="k">class</span> <span class="nc">FlinkOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--flink_master&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">str</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;Address of the Flink master where the pipeline &#39;</span>
<span class="s1">&#39;should be executed. Can either be of the form &#39;</span>
<span class="s1">&#39;</span><span class="se">\&#39;</span><span class="s1">host:port</span><span class="se">\&#39;</span><span class="s1"> or one of the special values &#39;</span>
<span class="s1">&#39;[local], [collection], or [auto].&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">&#39;--parallelism&#39;</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span>
<span class="p">(</span><span class="s1">&#39;The degree of parallelism to be used when &#39;</span>
<span class="s1">&#39;distributing operations onto workers.&#39;</span><span class="p">))</span>
<div class="viewcode-block" id="TestOptions"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TestOptions">[docs]</a><span class="k">class</span> <span class="nc">TestOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># Options for e2e test pipeline.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--on_success_matcher&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Verify state/output of e2e test pipeline. This is pickled &#39;</span>
<span class="s1">&#39;version of the matcher which should extends &#39;</span>
<span class="s1">&#39;hamcrest.core.base_matcher.BaseMatcher.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--dry_run&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="p">(</span><span class="s1">&#39;Used in unit testing runners without submitting the &#39;</span>
<span class="s1">&#39;actual job.&#39;</span><span class="p">))</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--wait_until_finish_duration&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="nb">type</span><span class="o">=</span><span class="nb">int</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;The time to wait (in milliseconds) for test pipeline to finish. &#39;</span>
<span class="s1">&#39;If it is set to None, it will wait indefinitely until the job &#39;</span>
<span class="s1">&#39;is finished.&#39;</span><span class="p">)</span>
<div class="viewcode-block" id="TestOptions.validate"><a class="viewcode-back" href="../../../apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.TestOptions.validate">[docs]</a> <span class="k">def</span> <span class="nf">validate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">validator</span><span class="p">):</span>
<span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">TestOptions</span><span class="p">)</span><span class="o">.</span><span class="n">on_success_matcher</span><span class="p">:</span>
<span class="n">errors</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">validator</span><span class="o">.</span><span class="n">validate_test_matcher</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;on_success_matcher&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="n">errors</span></div></div>
<span class="k">class</span> <span class="nc">TestDataflowOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">_add_argparse_args</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">parser</span><span class="p">):</span>
<span class="c1"># This option is passed to Dataflow Runner&#39;s Pub/Sub client. The camelCase</span>
<span class="c1"># style in &#39;dest&#39; matches the runner&#39;s.</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span>
<span class="s1">&#39;--pubsub_root_url&#39;</span><span class="p">,</span>
<span class="n">dest</span><span class="o">=</span><span class="s1">&#39;pubsubRootUrl&#39;</span><span class="p">,</span>
<span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s1">&#39;Root URL for use with the Google Cloud Pub/Sub API.&#39;</span><span class="p">,)</span>
<span class="c1"># TODO(silviuc): Add --files_to_stage option.</span>
<span class="c1"># This could potentially replace the --requirements_file and --setup_file.</span>
<span class="c1"># TODO(silviuc): Non-standard options. Keep them? If yes, add help too!</span>
<span class="c1"># Remote execution must check that this option is not None.</span>
<span class="k">class</span> <span class="nc">OptionsContext</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Set default pipeline options for pipelines created in this block.</span>
<span class="sd"> This is particularly useful for pipelines implicitly created with the</span>
<span class="sd"> [python list] | PTransform</span>
<span class="sd"> construct.</span>
<span class="sd"> Can also be used as a decorator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">overrides</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">options</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span>
<span class="k">def</span> <span class="nf">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">overrides</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">*</span><span class="n">exn_info</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">overrides</span><span class="o">.</span><span class="n">pop</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">f</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">wrapper</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="k">with</span> <span class="bp">self</span><span class="p">:</span>
<span class="n">f</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">return</span> <span class="n">wrapper</span>
<span class="nd">@classmethod</span>
<span class="k">def</span> <span class="nf">augment_options</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">options</span><span class="p">):</span>
<span class="k">for</span> <span class="n">override</span> <span class="ow">in</span> <span class="bp">cls</span><span class="o">.</span><span class="n">overrides</span><span class="p">:</span>
<span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">override</span><span class="o">.</span><span class="n">items</span><span class="p">():</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">options</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span>
<span class="k">return</span> <span class="n">options</span>
</pre></div>
</div>
<div class="articleComments">
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright .
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT:'../../../',
VERSION:'',
COLLAPSE_INDEX:false,
FILE_SUFFIX:'.html',
HAS_SOURCE: true,
SOURCELINK_SUFFIX: '.txt'
};
</script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.StickyNav.enable();
});
</script>
</body>
</html>