

<!--
Javascript to render AIRFLOW-XXX and PR references in text
as HTML links.

Overrides extrahead block from sphinx_rtd_theme
https://www.sphinx-doc.org/en/master/templating.html
-->


<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
  <meta charset="utf-8">
  
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  
  <title>airflow.contrib.operators.dataflow_operator &mdash; Airflow Documentation</title>
  

  
  
    <link rel="shortcut icon" href="../../../../../_static/pin_32.png"/>
  
  
  

  
  <script type="text/javascript" src="../../../../../_static/js/modernizr.min.js"></script>
  
    
      <script type="text/javascript" id="documentation_options" data-url_root="../../../../../" src="../../../../../_static/documentation_options.js"></script>
        <script type="text/javascript" src="../../../../../_static/jquery.js"></script>
        <script type="text/javascript" src="../../../../../_static/underscore.js"></script>
        <script type="text/javascript" src="../../../../../_static/doctools.js"></script>
        <script type="text/javascript" src="../../../../../_static/language_data.js"></script>
        <script type="text/javascript" src="../../../../../_static/jira-links.js"></script>
    
    <script type="text/javascript" src="../../../../../_static/js/theme.js"></script>

    

  
  <link rel="stylesheet" href="../../../../../_static/css/theme.css" type="text/css" />
  <link rel="stylesheet" href="../../../../../_static/pygments.css" type="text/css" />
  <link rel="stylesheet" href="../../../../../_static/graphviz.css" type="text/css" />
  <link rel="stylesheet" href="../../../../../_static/exampleinclude.css" type="text/css" />
    <link rel="index" title="Index" href="../../../../../genindex.html" />
    <link rel="search" title="Search" href="../../../../../search.html" />
    <link rel="next" title="airflow.contrib.operators.dataproc_operator" href="../dataproc_operator/index.html" />
    <link rel="prev" title="airflow.contrib.operators.databricks_operator" href="../databricks_operator/index.html" />
   
  <script>
  </script>
  <style>

  </style>

</head>

<body class="wy-body-for-nav">

   
  <div class="wy-grid-for-nav">
    
    <nav data-toggle="wy-nav-shift" class="wy-nav-side">
      <div class="wy-side-scroll">
        <div class="wy-side-nav-search" >
          

          
            <a href="../../../../../index.html" class="icon icon-home"> Airflow
          

          
          </a>

          
            
            
              <div class="version">
                1.10.8
              </div>
            
          

          
<div role="search">
  <form id="rtd-search-form" class="wy-form" action="../../../../../search.html" method="get">
    <input type="text" name="q" placeholder="Search docs" />
    <input type="hidden" name="check_keywords" value="yes" />
    <input type="hidden" name="area" value="default" />
  </form>
</div>

          
        </div>

        <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
          
            
            
              
            
            
              <ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../../project.html">Project</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../license.html">License</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../start.html">Quick Start</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../tutorial.html">Tutorial</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../howto/index.html">How-to Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../ui.html">UI / Screenshots</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../concepts.html">Concepts</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../profiling.html">Data Profiling</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../cli.html">Command Line Interface Reference</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../scheduler.html">Scheduling &amp; Triggers</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../executor/index.html">Executor</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../security.html">Security</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../timezone.html">Time zones</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../api.html">REST API Reference</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../integration.html">Integration</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../metrics.html">Metrics</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../errors.html">Error Tracking</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../kubernetes.html">Kubernetes</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../lineage.html">Lineage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../dag-serialization.html">DAG Serialization</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../changelog.html">Changelog</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../best-practices.html">Best Practices</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../macros.html">Macros reference</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../privacy_notice.html">Privacy Notice</a></li>
</ul>
<p class="caption"><span class="caption-text">References</span></p>
<ul class="current">
<li class="toctree-l1 current"><a class="reference internal" href="../../../../index.html">Python API</a><ul class="current">
<li class="toctree-l2 current"><a class="reference internal" href="../../../../index.html#operators">Operators</a><ul class="current">
<li class="toctree-l3"><a class="reference internal" href="../../../../index.html#baseoperator">BaseOperator</a></li>
<li class="toctree-l3"><a class="reference internal" href="../../../../index.html#basesensoroperator">BaseSensorOperator</a></li>
<li class="toctree-l3 current"><a class="reference internal" href="../../../../index.html#operators-packages">Operators packages</a><ul class="current">
<li class="toctree-l4"><a class="reference internal" href="../../../operators/index.html"><code class="xref py py-mod docutils literal notranslate"><span class="pre">airflow.operators</span></code></a></li>
<li class="toctree-l4"><a class="reference internal" href="../../../sensors/index.html"><code class="xref py py-mod docutils literal notranslate"><span class="pre">airflow.sensors</span></code></a></li>
<li class="toctree-l4 current"><a class="reference internal" href="../index.html"><code class="xref py py-mod docutils literal notranslate"><span class="pre">airflow.contrib.operators</span></code></a></li>
<li class="toctree-l4"><a class="reference internal" href="../../sensors/index.html"><code class="xref py py-mod docutils literal notranslate"><span class="pre">airflow.contrib.sensors</span></code></a></li>
</ul>
</li>
</ul>
</li>
<li class="toctree-l2"><a class="reference internal" href="../../../../index.html#hooks">Hooks</a></li>
<li class="toctree-l2"><a class="reference internal" href="../../../../index.html#executors">Executors</a></li>
<li class="toctree-l2"><a class="reference internal" href="../../../../index.html#models">Models</a></li>
<li class="toctree-l2"><a class="reference internal" href="../../../../index.html#core-and-community-package">Core and community package</a></li>
</ul>
</li>
<li class="toctree-l1"><a class="reference internal" href="../../../../../configurations-ref.html">Configurations</a></li>
</ul>

            
          
        </div>
      </div>
    </nav>

    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">

      
      <nav class="wy-nav-top" aria-label="top navigation">
        
          <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
          <a href="../../../../../index.html">Airflow</a>
        
      </nav>


      <div class="wy-nav-content">
        
        <div class="rst-content">
        
          















<div role="navigation" aria-label="breadcrumbs navigation">

  <ul class="wy-breadcrumbs">
    
      <li><a href="../../../../../index.html">Docs</a> &raquo;</li>
        
          <li><a href="../../../../index.html">Python API Reference</a> &raquo;</li>
        
          <li><a href="../index.html"><code class="xref py py-mod docutils literal notranslate"><span class="pre">airflow.contrib.operators</span></code></a> &raquo;</li>
        
      <li><code class="xref py py-mod docutils literal notranslate"><span class="pre">airflow.contrib.operators.dataflow_operator</span></code></li>
    
    
      <li class="wy-breadcrumbs-aside">
        
            
            <a href="../../../../../_sources/_api/airflow/contrib/operators/dataflow_operator/index.rst.txt" rel="nofollow"> View page source</a>
          
        
      </li>
    
  </ul>

  
  <hr/>
</div>
          <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
           <div itemprop="articleBody">
            
  <div class="section" id="module-airflow.contrib.operators.dataflow_operator">
<span id="airflow-contrib-operators-dataflow-operator"></span><h1><a class="reference internal" href="#module-airflow.contrib.operators.dataflow_operator" title="airflow.contrib.operators.dataflow_operator"><code class="xref py py-mod docutils literal notranslate"><span class="pre">airflow.contrib.operators.dataflow_operator</span></code></a><a class="headerlink" href="#module-airflow.contrib.operators.dataflow_operator" title="Permalink to this headline">¶</a></h1>
<div class="section" id="module-contents">
<h2>Module Contents<a class="headerlink" href="#module-contents" title="Permalink to this headline">¶</a></h2>
<dl class="class">
<dt id="airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator">
<em class="property">class </em><code class="sig-prename descclassname">airflow.contrib.operators.dataflow_operator.</code><code class="sig-name descname">DataFlowJavaOperator</code><span class="sig-paren">(</span><em class="sig-param">jar</em>, <em class="sig-param">job_name='{{task.task_id}}'</em>, <em class="sig-param">dataflow_default_options=None</em>, <em class="sig-param">options=None</em>, <em class="sig-param">gcp_conn_id='google_cloud_default'</em>, <em class="sig-param">delegate_to=None</em>, <em class="sig-param">poll_sleep=10</em>, <em class="sig-param">job_class=None</em>, <em class="sig-param">*args</em>, <em class="sig-param">**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataFlowJavaOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator" title="Permalink to this definition">¶</a></dt>
<dd><p>Bases: <a class="reference internal" href="../../../models/index.html#airflow.models.BaseOperator" title="airflow.models.BaseOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">airflow.models.BaseOperator</span></code></a></p>
<p>Start a Java Cloud DataFlow batch job. The parameters of the operation
will be passed to the job.</p>
<p><strong>Example</strong>:</p>
<div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span>
    <span class="s1">&#39;owner&#39;</span><span class="p">:</span> <span class="s1">&#39;Airflow&#39;</span><span class="p">,</span>
    <span class="s1">&#39;depends_on_past&#39;</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span>
    <span class="s1">&#39;start_date&#39;</span><span class="p">:</span>
        <span class="p">(</span><span class="mi">2016</span><span class="p">,</span> <span class="mi">8</span><span class="p">,</span> <span class="mi">1</span><span class="p">),</span>
    <span class="s1">&#39;email&#39;</span><span class="p">:</span> <span class="p">[</span><span class="s1">&#39;alex@vanboxel.be&#39;</span><span class="p">],</span>
    <span class="s1">&#39;email_on_failure&#39;</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span>
    <span class="s1">&#39;email_on_retry&#39;</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span>
    <span class="s1">&#39;retries&#39;</span><span class="p">:</span> <span class="mi">1</span><span class="p">,</span>
    <span class="s1">&#39;retry_delay&#39;</span><span class="p">:</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">minutes</span><span class="o">=</span><span class="mi">30</span><span class="p">),</span>
    <span class="s1">&#39;dataflow_default_options&#39;</span><span class="p">:</span> <span class="p">{</span>
        <span class="s1">&#39;project&#39;</span><span class="p">:</span> <span class="s1">&#39;my-gcp-project&#39;</span><span class="p">,</span>
        <span class="s1">&#39;zone&#39;</span><span class="p">:</span> <span class="s1">&#39;us-central1-f&#39;</span><span class="p">,</span>
        <span class="s1">&#39;stagingLocation&#39;</span><span class="p">:</span> <span class="s1">&#39;gs://bucket/tmp/dataflow/staging/&#39;</span><span class="p">,</span>
    <span class="p">}</span>
<span class="p">}</span>

<span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">&#39;test-dag&#39;</span><span class="p">,</span> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">)</span>

<span class="n">task</span> <span class="o">=</span> <span class="n">DataFlowJavaOperator</span><span class="p">(</span>
    <span class="n">gcp_conn_id</span><span class="o">=</span><span class="s1">&#39;gcp_default&#39;</span><span class="p">,</span>
    <span class="n">task_id</span><span class="o">=</span><span class="s1">&#39;normalize-cal&#39;</span><span class="p">,</span>
    <span class="n">jar</span><span class="o">=</span><span class="s1">&#39;{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar&#39;</span><span class="p">,</span>
    <span class="n">options</span><span class="o">=</span><span class="p">{</span>
        <span class="s1">&#39;autoscalingAlgorithm&#39;</span><span class="p">:</span> <span class="s1">&#39;BASIC&#39;</span><span class="p">,</span>
        <span class="s1">&#39;maxNumWorkers&#39;</span><span class="p">:</span> <span class="s1">&#39;50&#39;</span><span class="p">,</span>
        <span class="s1">&#39;start&#39;</span><span class="p">:</span> <span class="s1">&#39;{{ds}}&#39;</span><span class="p">,</span>
        <span class="s1">&#39;partitionType&#39;</span><span class="p">:</span> <span class="s1">&#39;DAY&#39;</span>

    <span class="p">},</span>
    <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</pre></div>
</div>
<div class="admonition seealso">
<p class="admonition-title">See also</p>
<p>For more detail on job submission have a look at the reference:
<a class="reference external" href="https://cloud.google.com/dataflow/pipelines/specifying-exec-params">https://cloud.google.com/dataflow/pipelines/specifying-exec-params</a></p>
</div>
<dl class="field-list simple">
<dt class="field-odd">Parameters</dt>
<dd class="field-odd"><ul class="simple">
<li><p><strong>jar</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The reference to a self executing DataFlow jar (templated).</p></li>
<li><p><strong>job_name</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The ‘jobName’ to use when executing the DataFlow job
(templated). This ends up being set in the pipeline options, so any entry
with key <code class="docutils literal notranslate"><span class="pre">'jobName'</span></code> in <code class="docutils literal notranslate"><span class="pre">options</span></code> will be overwritten.</p></li>
<li><p><strong>dataflow_default_options</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#dict" title="(in Python v3.8)"><em>dict</em></a>) – Map of default job options.</p></li>
<li><p><strong>options</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#dict" title="(in Python v3.8)"><em>dict</em></a>) – Map of job specific options.</p></li>
<li><p><strong>gcp_conn_id</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The connection ID to use connecting to Google Cloud
Platform.</p></li>
<li><p><strong>delegate_to</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.</p></li>
<li><p><strong>poll_sleep</strong> (<a class="reference external" href="https://docs.python.org/3/library/functions.html#int" title="(in Python v3.8)"><em>int</em></a>) – The time in seconds to sleep between polling Google
Cloud Platform for the dataflow job status while the job is in the
JOB_STATE_RUNNING state.</p></li>
<li><p><strong>job_class</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The name of the dataflow job class to be executed, it
is often not the main class configured in the dataflow jar file.</p></li>
</ul>
</dd>
</dl>
<p><code class="docutils literal notranslate"><span class="pre">jar</span></code>, <code class="docutils literal notranslate"><span class="pre">options</span></code>, and <code class="docutils literal notranslate"><span class="pre">job_name</span></code> are templated so you can use variables in them.</p>
<p>Note that both
<code class="docutils literal notranslate"><span class="pre">dataflow_default_options</span></code> and <code class="docutils literal notranslate"><span class="pre">options</span></code> will be merged to specify pipeline
execution parameter, and <code class="docutils literal notranslate"><span class="pre">dataflow_default_options</span></code> is expected to save
high-level options, for instances, project and zone information, which
apply to all dataflow operators in the DAG.</p>
<p>It’s a good practice to define dataflow_* parameters in the default_args of the dag
like the project, zone and staging location.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span>
    <span class="s1">&#39;dataflow_default_options&#39;</span><span class="p">:</span> <span class="p">{</span>
        <span class="s1">&#39;project&#39;</span><span class="p">:</span> <span class="s1">&#39;my-gcp-project&#39;</span><span class="p">,</span>
        <span class="s1">&#39;zone&#39;</span><span class="p">:</span> <span class="s1">&#39;europe-west1-d&#39;</span><span class="p">,</span>
        <span class="s1">&#39;stagingLocation&#39;</span><span class="p">:</span> <span class="s1">&#39;gs://my-staging-bucket/staging/&#39;</span>
    <span class="p">}</span>
<span class="p">}</span>
</pre></div>
</div>
<p>You need to pass the path to your dataflow as a file reference with the <code class="docutils literal notranslate"><span class="pre">jar</span></code>
parameter, the jar needs to be a self executing jar (see documentation here:
<a class="reference external" href="https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar">https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar</a>).
Use <code class="docutils literal notranslate"><span class="pre">options</span></code> to pass on options to your job.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">t1</span> <span class="o">=</span> <span class="n">DataFlowJavaOperator</span><span class="p">(</span>
    <span class="n">task_id</span><span class="o">=</span><span class="s1">&#39;dataflow_example&#39;</span><span class="p">,</span>
    <span class="n">jar</span><span class="o">=</span><span class="s1">&#39;{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar&#39;</span><span class="p">,</span>
    <span class="n">options</span><span class="o">=</span><span class="p">{</span>
        <span class="s1">&#39;autoscalingAlgorithm&#39;</span><span class="p">:</span> <span class="s1">&#39;BASIC&#39;</span><span class="p">,</span>
        <span class="s1">&#39;maxNumWorkers&#39;</span><span class="p">:</span> <span class="s1">&#39;50&#39;</span><span class="p">,</span>
        <span class="s1">&#39;start&#39;</span><span class="p">:</span> <span class="s1">&#39;{{ds}}&#39;</span><span class="p">,</span>
        <span class="s1">&#39;partitionType&#39;</span><span class="p">:</span> <span class="s1">&#39;DAY&#39;</span><span class="p">,</span>
        <span class="s1">&#39;labels&#39;</span><span class="p">:</span> <span class="p">{</span><span class="s1">&#39;foo&#39;</span> <span class="p">:</span> <span class="s1">&#39;bar&#39;</span><span class="p">}</span>
    <span class="p">},</span>
    <span class="n">gcp_conn_id</span><span class="o">=</span><span class="s1">&#39;gcp-airflow-service-account&#39;</span><span class="p">,</span>
    <span class="n">dag</span><span class="o">=</span><span class="n">my</span><span class="o">-</span><span class="n">dag</span><span class="p">)</span>
</pre></div>
</div>
<dl class="attribute">
<dt id="airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator.template_fields">
<code class="sig-name descname">template_fields</code><em class="property"> = ['options', 'jar', 'job_name']</em><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataFlowJavaOperator.template_fields"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator.template_fields" title="Permalink to this definition">¶</a></dt>
<dd></dd></dl>

<dl class="attribute">
<dt id="airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator.ui_color">
<code class="sig-name descname">ui_color</code><em class="property"> = #0273d4</em><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataFlowJavaOperator.ui_color"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator.ui_color" title="Permalink to this definition">¶</a></dt>
<dd></dd></dl>

<dl class="method">
<dt id="airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator.execute">
<code class="sig-name descname">execute</code><span class="sig-paren">(</span><em class="sig-param">self</em>, <em class="sig-param">context</em><span class="sig-paren">)</span><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataFlowJavaOperator.execute"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator.execute" title="Permalink to this definition">¶</a></dt>
<dd></dd></dl>

</dd></dl>

<dl class="class">
<dt id="airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator">
<em class="property">class </em><code class="sig-prename descclassname">airflow.contrib.operators.dataflow_operator.</code><code class="sig-name descname">DataflowTemplateOperator</code><span class="sig-paren">(</span><em class="sig-param">template</em>, <em class="sig-param">job_name='{{task.task_id}}'</em>, <em class="sig-param">dataflow_default_options=None</em>, <em class="sig-param">parameters=None</em>, <em class="sig-param">gcp_conn_id='google_cloud_default'</em>, <em class="sig-param">delegate_to=None</em>, <em class="sig-param">poll_sleep=10</em>, <em class="sig-param">*args</em>, <em class="sig-param">**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataflowTemplateOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator" title="Permalink to this definition">¶</a></dt>
<dd><p>Bases: <a class="reference internal" href="../../../models/index.html#airflow.models.BaseOperator" title="airflow.models.BaseOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">airflow.models.BaseOperator</span></code></a></p>
<p>Start a Templated Cloud DataFlow batch job. The parameters of the operation
will be passed to the job.</p>
<dl class="field-list simple">
<dt class="field-odd">Parameters</dt>
<dd class="field-odd"><ul class="simple">
<li><p><strong>template</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The reference to the DataFlow template.</p></li>
<li><p><strong>job_name</strong> – The ‘jobName’ to use when executing the DataFlow template
(templated).</p></li>
<li><p><strong>dataflow_default_options</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#dict" title="(in Python v3.8)"><em>dict</em></a>) – Map of default job environment options.</p></li>
<li><p><strong>parameters</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#dict" title="(in Python v3.8)"><em>dict</em></a>) – Map of job specific parameters for the template.</p></li>
<li><p><strong>gcp_conn_id</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The connection ID to use connecting to Google Cloud
Platform.</p></li>
<li><p><strong>delegate_to</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.</p></li>
<li><p><strong>poll_sleep</strong> (<a class="reference external" href="https://docs.python.org/3/library/functions.html#int" title="(in Python v3.8)"><em>int</em></a>) – The time in seconds to sleep between polling Google
Cloud Platform for the dataflow job status while the job is in the
JOB_STATE_RUNNING state.</p></li>
</ul>
</dd>
</dl>
<p>It’s a good practice to define dataflow_* parameters in the default_args of the dag
like the project, zone and staging location.</p>
<div class="admonition seealso">
<p class="admonition-title">See also</p>
<p><a class="reference external" href="https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters">https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters</a>
<a class="reference external" href="https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment">https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment</a></p>
</div>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span>
    <span class="s1">&#39;dataflow_default_options&#39;</span><span class="p">:</span> <span class="p">{</span>
        <span class="s1">&#39;project&#39;</span><span class="p">:</span> <span class="s1">&#39;my-gcp-project&#39;</span><span class="p">,</span>
        <span class="s1">&#39;region&#39;</span><span class="p">:</span> <span class="s1">&#39;europe-west1&#39;</span><span class="p">,</span>
        <span class="s1">&#39;zone&#39;</span><span class="p">:</span> <span class="s1">&#39;europe-west1-d&#39;</span><span class="p">,</span>
        <span class="s1">&#39;tempLocation&#39;</span><span class="p">:</span> <span class="s1">&#39;gs://my-staging-bucket/staging/&#39;</span><span class="p">,</span>
        <span class="p">}</span>
    <span class="p">}</span>
<span class="p">}</span>
</pre></div>
</div>
<p>You need to pass the path to your dataflow template as a file reference with the
<code class="docutils literal notranslate"><span class="pre">template</span></code> parameter. Use <code class="docutils literal notranslate"><span class="pre">parameters</span></code> to pass on parameters to your job.
Use <code class="docutils literal notranslate"><span class="pre">environment</span></code> to pass on runtime environment variables to your job.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">t1</span> <span class="o">=</span> <span class="n">DataflowTemplateOperator</span><span class="p">(</span>
    <span class="n">task_id</span><span class="o">=</span><span class="s1">&#39;dataflow_example&#39;</span><span class="p">,</span>
    <span class="n">template</span><span class="o">=</span><span class="s1">&#39;{{var.value.gcp_dataflow_base}}&#39;</span><span class="p">,</span>
    <span class="n">parameters</span><span class="o">=</span><span class="p">{</span>
        <span class="s1">&#39;inputFile&#39;</span><span class="p">:</span> <span class="s2">&quot;gs://bucket/input/my_input.txt&quot;</span><span class="p">,</span>
        <span class="s1">&#39;outputFile&#39;</span><span class="p">:</span> <span class="s2">&quot;gs://bucket/output/my_output.txt&quot;</span>
    <span class="p">},</span>
    <span class="n">gcp_conn_id</span><span class="o">=</span><span class="s1">&#39;gcp-airflow-service-account&#39;</span><span class="p">,</span>
    <span class="n">dag</span><span class="o">=</span><span class="n">my</span><span class="o">-</span><span class="n">dag</span><span class="p">)</span>
</pre></div>
</div>
<p><code class="docutils literal notranslate"><span class="pre">template</span></code>, <code class="docutils literal notranslate"><span class="pre">dataflow_default_options</span></code>, <code class="docutils literal notranslate"><span class="pre">parameters</span></code>, and <code class="docutils literal notranslate"><span class="pre">job_name</span></code> are
templated so you can use variables in them.</p>
<p>Note that <code class="docutils literal notranslate"><span class="pre">dataflow_default_options</span></code> is expected to save high-level options
for project information, which apply to all dataflow operators in the DAG.</p>
<blockquote>
<div><div class="admonition seealso">
<p class="admonition-title">See also</p>
<p><a class="reference external" href="https://cloud.google.com/dataflow/docs/reference/rest/v1b3">https://cloud.google.com/dataflow/docs/reference/rest/v1b3</a>
/LaunchTemplateParameters
<a class="reference external" href="https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment">https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment</a>
For more detail on job template execution have a look at the reference:
<a class="reference external" href="https://cloud.google.com/dataflow/docs/templates/executing-templates">https://cloud.google.com/dataflow/docs/templates/executing-templates</a></p>
</div>
</div></blockquote>
<dl class="attribute">
<dt id="airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator.template_fields">
<code class="sig-name descname">template_fields</code><em class="property"> = ['parameters', 'dataflow_default_options', 'template', 'job_name']</em><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataflowTemplateOperator.template_fields"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator.template_fields" title="Permalink to this definition">¶</a></dt>
<dd></dd></dl>

<dl class="attribute">
<dt id="airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator.ui_color">
<code class="sig-name descname">ui_color</code><em class="property"> = #0273d4</em><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataflowTemplateOperator.ui_color"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator.ui_color" title="Permalink to this definition">¶</a></dt>
<dd></dd></dl>

<dl class="method">
<dt id="airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator.execute">
<code class="sig-name descname">execute</code><span class="sig-paren">(</span><em class="sig-param">self</em>, <em class="sig-param">context</em><span class="sig-paren">)</span><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataflowTemplateOperator.execute"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator.execute" title="Permalink to this definition">¶</a></dt>
<dd></dd></dl>

</dd></dl>

<dl class="class">
<dt id="airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator">
<em class="property">class </em><code class="sig-prename descclassname">airflow.contrib.operators.dataflow_operator.</code><code class="sig-name descname">DataFlowPythonOperator</code><span class="sig-paren">(</span><em class="sig-param">py_file</em>, <em class="sig-param">job_name='{{task.task_id}}'</em>, <em class="sig-param">py_options=None</em>, <em class="sig-param">dataflow_default_options=None</em>, <em class="sig-param">options=None</em>, <em class="sig-param">gcp_conn_id='google_cloud_default'</em>, <em class="sig-param">delegate_to=None</em>, <em class="sig-param">poll_sleep=10</em>, <em class="sig-param">*args</em>, <em class="sig-param">**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataFlowPythonOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator" title="Permalink to this definition">¶</a></dt>
<dd><p>Bases: <a class="reference internal" href="../../../models/index.html#airflow.models.BaseOperator" title="airflow.models.BaseOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">airflow.models.BaseOperator</span></code></a></p>
<p>Launching Cloud Dataflow jobs written in python. Note that both
dataflow_default_options and options will be merged to specify pipeline
execution parameter, and dataflow_default_options is expected to save
high-level options, for instances, project and zone information, which
apply to all dataflow operators in the DAG.</p>
<div class="admonition seealso">
<p class="admonition-title">See also</p>
<p>For more detail on job submission have a look at the reference:
<a class="reference external" href="https://cloud.google.com/dataflow/pipelines/specifying-exec-params">https://cloud.google.com/dataflow/pipelines/specifying-exec-params</a></p>
</div>
<dl class="field-list simple">
<dt class="field-odd">Parameters</dt>
<dd class="field-odd"><ul class="simple">
<li><p><strong>py_file</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – Reference to the python dataflow pipeline file.py, e.g.,
/some/local/file/path/to/your/python/pipeline/file. (templated)</p></li>
<li><p><strong>job_name</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The ‘job_name’ to use when executing the DataFlow job
(templated). This ends up being set in the pipeline options, so any entry
with key <code class="docutils literal notranslate"><span class="pre">'jobName'</span></code> or <code class="docutils literal notranslate"><span class="pre">'job_name'</span></code> in <code class="docutils literal notranslate"><span class="pre">options</span></code> will be overwritten.</p></li>
<li><p><strong>py_options</strong> – Additional python options, e.g., [“-m”, “-v”].</p></li>
<li><p><strong>dataflow_default_options</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#dict" title="(in Python v3.8)"><em>dict</em></a>) – Map of default job options.</p></li>
<li><p><strong>options</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#dict" title="(in Python v3.8)"><em>dict</em></a>) – Map of job specific options.</p></li>
<li><p><strong>gcp_conn_id</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The connection ID to use connecting to Google Cloud
Platform.</p></li>
<li><p><strong>delegate_to</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide  delegation enabled.</p></li>
<li><p><strong>poll_sleep</strong> (<a class="reference external" href="https://docs.python.org/3/library/functions.html#int" title="(in Python v3.8)"><em>int</em></a>) – The time in seconds to sleep between polling Google
Cloud Platform for the dataflow job status while the job is in the
JOB_STATE_RUNNING state.</p></li>
</ul>
</dd>
</dl>
<dl class="attribute">
<dt id="airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator.template_fields">
<code class="sig-name descname">template_fields</code><em class="property"> = ['options', 'dataflow_default_options', 'job_name', 'py_file']</em><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataFlowPythonOperator.template_fields"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator.template_fields" title="Permalink to this definition">¶</a></dt>
<dd></dd></dl>

<dl class="method">
<dt id="airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator.execute">
<code class="sig-name descname">execute</code><span class="sig-paren">(</span><em class="sig-param">self</em>, <em class="sig-param">context</em><span class="sig-paren">)</span><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#DataFlowPythonOperator.execute"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator.execute" title="Permalink to this definition">¶</a></dt>
<dd><p>Execute the python dataflow job.</p>
</dd></dl>

</dd></dl>

<dl class="class">
<dt id="airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper">
<em class="property">class </em><code class="sig-prename descclassname">airflow.contrib.operators.dataflow_operator.</code><code class="sig-name descname">GoogleCloudBucketHelper</code><span class="sig-paren">(</span><em class="sig-param">gcp_conn_id='google_cloud_default'</em>, <em class="sig-param">delegate_to=None</em><span class="sig-paren">)</span><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#GoogleCloudBucketHelper"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper" title="Permalink to this definition">¶</a></dt>
<dd><p>Bases: <a class="reference external" href="https://docs.python.org/3/library/functions.html#object" title="(in Python v3.8)"><code class="xref py py-class docutils literal notranslate"><span class="pre">object</span></code></a></p>
<p>GoogleCloudStorageHook helper class to download GCS object.</p>
<dl class="attribute">
<dt id="airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.GCS_PREFIX_LENGTH">
<code class="sig-name descname">GCS_PREFIX_LENGTH</code><em class="property"> = 5</em><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#GoogleCloudBucketHelper.GCS_PREFIX_LENGTH"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.GCS_PREFIX_LENGTH" title="Permalink to this definition">¶</a></dt>
<dd></dd></dl>

<dl class="method">
<dt id="airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.google_cloud_to_local">
<code class="sig-name descname">google_cloud_to_local</code><span class="sig-paren">(</span><em class="sig-param">self</em>, <em class="sig-param">file_name</em><span class="sig-paren">)</span><a class="reference internal" href="../../../../../_modules/airflow/contrib/operators/dataflow_operator.html#GoogleCloudBucketHelper.google_cloud_to_local"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.google_cloud_to_local" title="Permalink to this definition">¶</a></dt>
<dd><p>Checks whether the file specified by file_name is stored in Google Cloud
Storage (GCS), if so, downloads the file and saves it locally. The full
path of the saved file will be returned. Otherwise the local file_name
will be returned immediately.</p>
<dl class="field-list simple">
<dt class="field-odd">Parameters</dt>
<dd class="field-odd"><p><strong>file_name</strong> (<a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)"><em>str</em></a>) – The full path of input file.</p>
</dd>
<dt class="field-even">Returns</dt>
<dd class="field-even"><p>The full path of local file.</p>
</dd>
<dt class="field-odd">Return type</dt>
<dd class="field-odd"><p><a class="reference external" href="https://docs.python.org/3/library/stdtypes.html#str" title="(in Python v3.8)">str</a></p>
</dd>
</dl>
</dd></dl>

</dd></dl>

</div>
</div>


           </div>
           
          </div>
          

<footer>
  
    <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
      
        <a href="../dataproc_operator/index.html" class="btn btn-neutral float-right" title="airflow.contrib.operators.dataproc_operator" accesskey="n" rel="next">Next <span class="fa fa-arrow-circle-right"></span></a>
      
      
        <a href="../databricks_operator/index.html" class="btn btn-neutral float-left" title="airflow.contrib.operators.databricks_operator" accesskey="p" rel="prev"><span class="fa fa-arrow-circle-left"></span> Previous</a>
      
    </div>
  

  <hr/>

  <div role="contentinfo">
    <p>

    </p>
  </div>
  Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
  <div class="footer">This page uses <a href="https://analytics.google.com/">
    Google Analytics</a> to collect statistics. You can disable it by blocking
    the JavaScript coming from www.google-analytics.com. Check our
    <a href="../../../../../privacy_notice.html">Privacy Policy</a>
    for more details.
  </div>


</footer>

        </div>
      </div>

    </section>

  </div>
  


  <script type="text/javascript">
      jQuery(function () {
          SphinxRtdTheme.Navigation.enable(true);
      });
  </script>

  
  
    
    <!-- Theme Analytics -->
    <script>
    (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
    })(window,document,'script','https://www.google-analytics.com/analytics.js','ga');

    ga('create', 'UA-140539454-1', 'auto');
    ga('send', 'pageview');
    </script>

    
   

</body>
</html>