| <!DOCTYPE html> |
| |
| <html lang="en" data-content_root="./"> |
| <head> |
| <meta charset="utf-8" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0" /><meta name="viewport" content="width=device-width, initial-scale=1" /> |
| |
| <title>Arrow Flight — Apache Arrow Python Cookbook documentation</title> |
| <link rel="stylesheet" type="text/css" href="_static/pygments.css?v=4f649999" /> |
| <link rel="stylesheet" type="text/css" href="_static/alabaster.css?v=39aeeac0" /> |
| <script src="_static/documentation_options.js?v=5929fcd5"></script> |
| <script src="_static/doctools.js?v=888ff710"></script> |
| <script src="_static/sphinx_highlight.js?v=dc90522c"></script> |
| <link rel="icon" href="_static/favicon.ico"/> |
| <link rel="index" title="Index" href="genindex.html" /> |
| <link rel="search" title="Search" href="search.html" /> |
| <link rel="prev" title="Data Manipulation" href="data.html" /> |
| |
| |
| <link rel="stylesheet" href="_static/custom.css" type="text/css" /> |
| |
| |
| <meta name="viewport" content="width=device-width, initial-scale=0.9, maximum-scale=0.9" /> |
| <!-- Matomo --> |
| <script> |
| var _paq = window._paq = window._paq || []; |
| /* tracker methods like "setCustomDimension" should be called before "trackPageView" */ |
| /* We explicitly disable cookie tracking to avoid privacy issues */ |
| _paq.push(['disableCookies']); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="https://analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '20']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script> |
| <!-- End Matomo Code --> |
| |
| </head><body> |
| |
| |
| <div class="document"> |
| <div class="documentwrapper"> |
| <div class="bodywrapper"> |
| |
| |
| <div class="body" role="main"> |
| |
| <section id="arrow-flight"> |
| <h1><a class="toc-backref" href="#id1" role="doc-backlink">Arrow Flight</a><a class="headerlink" href="#arrow-flight" title="Link to this heading">¶</a></h1> |
| <p>Recipes related to leveraging Arrow Flight protocol</p> |
| <nav class="contents" id="contents"> |
| <p class="topic-title">Contents</p> |
| <ul class="simple"> |
| <li><p><a class="reference internal" href="#arrow-flight" id="id1">Arrow Flight</a></p> |
| <ul> |
| <li><p><a class="reference internal" href="#simple-parquet-storage-service-with-arrow-flight" id="id2">Simple Parquet storage service with Arrow Flight</a></p></li> |
| <li><p><a class="reference internal" href="#streaming-parquet-storage-service" id="id3">Streaming Parquet Storage Service</a></p></li> |
| <li><p><a class="reference internal" href="#authentication-with-user-password" id="id4">Authentication with user/password</a></p></li> |
| <li><p><a class="reference internal" href="#securing-connections-with-tls" id="id5">Securing connections with TLS</a></p></li> |
| <li><p><a class="reference internal" href="#propagating-opentelemetry-traces" id="id6">Propagating OpenTelemetry Traces</a></p></li> |
| </ul> |
| </li> |
| </ul> |
| </nav> |
| <section id="simple-parquet-storage-service-with-arrow-flight"> |
| <h2><a class="toc-backref" href="#id2" role="doc-backlink">Simple Parquet storage service with Arrow Flight</a><a class="headerlink" href="#simple-parquet-storage-service-with-arrow-flight" title="Link to this heading">¶</a></h2> |
| <p>Suppose you want to implement a service that can store, send and receive |
| Parquet files using the Arrow Flight protocol, |
| <code class="docutils literal notranslate"><span class="pre">pyarrow</span></code> provides an implementation framework in <code class="xref py py-mod docutils literal notranslate"><span class="pre">pyarrow.flight</span></code> |
| and particularly through the <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightServerBase.html#pyarrow.flight.FlightServerBase" title="(in Apache Arrow v14.0.1)"><code class="xref py py-class docutils literal notranslate"><span class="pre">pyarrow.flight.FlightServerBase</span></code></a> class.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pathlib</span> |
| |
| <span class="kn">import</span> <span class="nn">pyarrow</span> <span class="k">as</span> <span class="nn">pa</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.flight</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.parquet</span> |
| |
| |
| <span class="k">class</span> <span class="nc">FlightServer</span><span class="p">(</span><span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightServerBase</span><span class="p">):</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">location</span><span class="o">=</span><span class="s2">"grpc://0.0.0.0:8815"</span><span class="p">,</span> |
| <span class="n">repo</span><span class="o">=</span><span class="n">pathlib</span><span class="o">.</span><span class="n">Path</span><span class="p">(</span><span class="s2">"./datasets"</span><span class="p">),</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">FlightServer</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">location</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_location</span> <span class="o">=</span> <span class="n">location</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">=</span> <span class="n">repo</span> |
| |
| <span class="k">def</span> <span class="nf">_make_flight_info</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dataset</span><span class="p">):</span> |
| <span class="n">dataset_path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">/</span> <span class="n">dataset</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">parquet</span><span class="o">.</span><span class="n">read_schema</span><span class="p">(</span><span class="n">dataset_path</span><span class="p">)</span> |
| <span class="n">metadata</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">parquet</span><span class="o">.</span><span class="n">read_metadata</span><span class="p">(</span><span class="n">dataset_path</span><span class="p">)</span> |
| <span class="n">descriptor</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightDescriptor</span><span class="o">.</span><span class="n">for_path</span><span class="p">(</span> |
| <span class="n">dataset</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="p">)</span> |
| <span class="n">endpoints</span> <span class="o">=</span> <span class="p">[</span><span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightEndpoint</span><span class="p">(</span><span class="n">dataset</span><span class="p">,</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_location</span><span class="p">])]</span> |
| <span class="k">return</span> <span class="n">pyarrow</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightInfo</span><span class="p">(</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">descriptor</span><span class="p">,</span> |
| <span class="n">endpoints</span><span class="p">,</span> |
| <span class="n">metadata</span><span class="o">.</span><span class="n">num_rows</span><span class="p">,</span> |
| <span class="n">metadata</span><span class="o">.</span><span class="n">serialized_size</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">list_flights</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">criteria</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">dataset</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span><span class="o">.</span><span class="n">iterdir</span><span class="p">():</span> |
| <span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">_make_flight_info</span><span class="p">(</span><span class="n">dataset</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">get_flight_info</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">descriptor</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_make_flight_info</span><span class="p">(</span><span class="n">descriptor</span><span class="o">.</span><span class="n">path</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">do_put</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">descriptor</span><span class="p">,</span> <span class="n">reader</span><span class="p">,</span> <span class="n">writer</span><span class="p">):</span> |
| <span class="n">dataset</span> <span class="o">=</span> <span class="n">descriptor</span><span class="o">.</span><span class="n">path</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="n">dataset_path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">/</span> <span class="n">dataset</span> |
| <span class="n">data_table</span> <span class="o">=</span> <span class="n">reader</span><span class="o">.</span><span class="n">read_all</span><span class="p">()</span> |
| <span class="n">pa</span><span class="o">.</span><span class="n">parquet</span><span class="o">.</span><span class="n">write_table</span><span class="p">(</span><span class="n">data_table</span><span class="p">,</span> <span class="n">dataset_path</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">do_get</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">ticket</span><span class="p">):</span> |
| <span class="n">dataset</span> <span class="o">=</span> <span class="n">ticket</span><span class="o">.</span><span class="n">ticket</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="n">dataset_path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">/</span> <span class="n">dataset</span> |
| <span class="k">return</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">RecordBatchStream</span><span class="p">(</span><span class="n">pa</span><span class="o">.</span><span class="n">parquet</span><span class="o">.</span><span class="n">read_table</span><span class="p">(</span><span class="n">dataset_path</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">list_actions</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">[</span> |
| <span class="p">(</span><span class="s2">"drop_dataset"</span><span class="p">,</span> <span class="s2">"Delete a dataset."</span><span class="p">),</span> |
| <span class="p">]</span> |
| |
| <span class="k">def</span> <span class="nf">do_action</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">action</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">action</span><span class="o">.</span><span class="n">type</span> <span class="o">==</span> <span class="s2">"drop_dataset"</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">do_drop_dataset</span><span class="p">(</span><span class="n">action</span><span class="o">.</span><span class="n">body</span><span class="o">.</span><span class="n">to_pybytes</span><span class="p">()</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span> |
| |
| <span class="k">def</span> <span class="nf">do_drop_dataset</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dataset</span><span class="p">):</span> |
| <span class="n">dataset_path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">/</span> <span class="n">dataset</span> |
| <span class="n">dataset_path</span><span class="o">.</span><span class="n">unlink</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p>The example server exposes <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightServerBase.html#pyarrow.flight.FlightServerBase.list_flights" title="(in Apache Arrow v14.0.1)"><code class="xref py py-meth docutils literal notranslate"><span class="pre">pyarrow.flight.FlightServerBase.list_flights()</span></code></a> |
| which is the method in charge of returning the list of data streams available |
| for fetching.</p> |
| <p>Likewise, <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightServerBase.html#pyarrow.flight.FlightServerBase.get_flight_info" title="(in Apache Arrow v14.0.1)"><code class="xref py py-meth docutils literal notranslate"><span class="pre">pyarrow.flight.FlightServerBase.get_flight_info()</span></code></a> provides |
| the information regarding a single specific data stream.</p> |
| <p>Then we expose <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightServerBase.html#pyarrow.flight.FlightServerBase.do_get" title="(in Apache Arrow v14.0.1)"><code class="xref py py-meth docutils literal notranslate"><span class="pre">pyarrow.flight.FlightServerBase.do_get()</span></code></a> which is in charge |
| of actually fetching the exposed data streams and sending them to the client.</p> |
| <p>Allowing to list and download data streams would be pretty useless if we didn’t |
| expose a way to create them, this is the responsibility of |
| <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightServerBase.html#pyarrow.flight.FlightServerBase.do_put" title="(in Apache Arrow v14.0.1)"><code class="xref py py-meth docutils literal notranslate"><span class="pre">pyarrow.flight.FlightServerBase.do_put()</span></code></a> which is in charge of receiving |
| new data from the client and dealing with it (in this case saving it |
| into a parquet file)</p> |
| <p>This are the most common Arrow Flight requests, if we need to add more |
| functionalities, we can do so using custom actions.</p> |
| <p>In the previous example a <code class="docutils literal notranslate"><span class="pre">drop_dataset</span></code> custom action is added. |
| All custom actions are executed through the |
| <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightServerBase.html#pyarrow.flight.FlightServerBase.do_action" title="(in Apache Arrow v14.0.1)"><code class="xref py py-meth docutils literal notranslate"><span class="pre">pyarrow.flight.FlightServerBase.do_action()</span></code></a> method, thus it’s up to |
| the server subclass to dispatch them properly. In this case we invoke |
| the <cite>do_drop_dataset</cite> method when the <cite>action.type</cite> is the one we expect.</p> |
| <p>Our server can then be started with |
| <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightServerBase.html#pyarrow.flight.FlightServerBase.serve" title="(in Apache Arrow v14.0.1)"><code class="xref py py-meth docutils literal notranslate"><span class="pre">pyarrow.flight.FlightServerBase.serve()</span></code></a></p> |
| <div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">'__main__'</span><span class="p">:</span> |
| <span class="n">server</span> <span class="o">=</span> <span class="n">FlightServer</span><span class="p">()</span> |
| <span class="n">server</span><span class="o">.</span><span class="n">_repo</span><span class="o">.</span><span class="n">mkdir</span><span class="p">(</span><span class="n">exist_ok</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="n">server</span><span class="o">.</span><span class="n">serve</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p>Once the server is started we can build a client to perform |
| requests to it</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pyarrow</span> <span class="k">as</span> <span class="nn">pa</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.flight</span> |
| |
| <span class="n">client</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="s2">"grpc://0.0.0.0:8815"</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>We can create a new table and upload it so that it gets stored |
| in a new parquet file:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># Upload a new dataset</span> |
| <span class="n">data_table</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">table</span><span class="p">(</span> |
| <span class="p">[[</span><span class="s2">"Mario"</span><span class="p">,</span> <span class="s2">"Luigi"</span><span class="p">,</span> <span class="s2">"Peach"</span><span class="p">]],</span> |
| <span class="n">names</span><span class="o">=</span><span class="p">[</span><span class="s2">"Character"</span><span class="p">]</span> |
| <span class="p">)</span> |
| <span class="n">upload_descriptor</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightDescriptor</span><span class="o">.</span><span class="n">for_path</span><span class="p">(</span><span class="s2">"uploaded.parquet"</span><span class="p">)</span> |
| <span class="n">writer</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">do_put</span><span class="p">(</span><span class="n">upload_descriptor</span><span class="p">,</span> <span class="n">data_table</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">write_table</span><span class="p">(</span><span class="n">data_table</span><span class="p">)</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p>Once uploaded we should be able to retrieve the metadata for our |
| newly uploaded table:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># Retrieve metadata of newly uploaded dataset</span> |
| <span class="n">flight</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">get_flight_info</span><span class="p">(</span><span class="n">upload_descriptor</span><span class="p">)</span> |
| <span class="n">descriptor</span> <span class="o">=</span> <span class="n">flight</span><span class="o">.</span><span class="n">descriptor</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Path:"</span><span class="p">,</span> <span class="n">descriptor</span><span class="o">.</span><span class="n">path</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">),</span> <span class="s2">"Rows:"</span><span class="p">,</span> <span class="n">flight</span><span class="o">.</span><span class="n">total_records</span><span class="p">,</span> <span class="s2">"Size:"</span><span class="p">,</span> <span class="n">flight</span><span class="o">.</span><span class="n">total_bytes</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"=== Schema ==="</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">flight</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"=============="</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <div class="highlight-none notranslate"><div class="highlight"><pre><span></span>Path: uploaded.parquet Rows: 3 Size: ... |
| === Schema === |
| Character: string |
| ============== |
| </pre></div> |
| </div> |
| <p>And we can fetch the content of the dataset:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># Read content of the dataset</span> |
| <span class="n">reader</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">do_get</span><span class="p">(</span><span class="n">flight</span><span class="o">.</span><span class="n">endpoints</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">ticket</span><span class="p">)</span> |
| <span class="n">read_table</span> <span class="o">=</span> <span class="n">reader</span><span class="o">.</span><span class="n">read_all</span><span class="p">()</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">read_table</span><span class="o">.</span><span class="n">to_pandas</span><span class="p">()</span><span class="o">.</span><span class="n">head</span><span class="p">())</span> |
| </pre></div> |
| </div> |
| <div class="highlight-none notranslate"><div class="highlight"><pre><span></span> Character |
| 0 Mario |
| 1 Luigi |
| 2 Peach |
| </pre></div> |
| </div> |
| <p>Once we finished we can invoke our custom action to delete the |
| dataset we newly uploaded:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># Drop the newly uploaded dataset</span> |
| <span class="n">client</span><span class="o">.</span><span class="n">do_action</span><span class="p">(</span><span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">Action</span><span class="p">(</span><span class="s2">"drop_dataset"</span><span class="p">,</span> <span class="s2">"uploaded.parquet"</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)))</span> |
| </pre></div> |
| </div> |
| <p>To confirm our dataset was deleted, |
| we might list all parquet files that are currently stored by the server:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># List existing datasets.</span> |
| <span class="k">for</span> <span class="n">flight</span> <span class="ow">in</span> <span class="n">client</span><span class="o">.</span><span class="n">list_flights</span><span class="p">():</span> |
| <span class="n">descriptor</span> <span class="o">=</span> <span class="n">flight</span><span class="o">.</span><span class="n">descriptor</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Path:"</span><span class="p">,</span> <span class="n">descriptor</span><span class="o">.</span><span class="n">path</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">),</span> <span class="s2">"Rows:"</span><span class="p">,</span> <span class="n">flight</span><span class="o">.</span><span class="n">total_records</span><span class="p">,</span> <span class="s2">"Size:"</span><span class="p">,</span> <span class="n">flight</span><span class="o">.</span><span class="n">total_bytes</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"=== Schema ==="</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">flight</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"=============="</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">""</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| </section> |
| <section id="streaming-parquet-storage-service"> |
| <h2><a class="toc-backref" href="#id3" role="doc-backlink">Streaming Parquet Storage Service</a><a class="headerlink" href="#streaming-parquet-storage-service" title="Link to this heading">¶</a></h2> |
| <p>We can improve the Parquet storage service and avoid holding entire datasets in |
| memory by streaming data. Flight readers and writers, like others in PyArrow, |
| can be iterated through, so let’s update the server from before to take |
| advantage of this:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pathlib</span> |
| |
| <span class="kn">import</span> <span class="nn">pyarrow</span> <span class="k">as</span> <span class="nn">pa</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.flight</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.parquet</span> |
| |
| |
| <span class="k">class</span> <span class="nc">FlightServer</span><span class="p">(</span><span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightServerBase</span><span class="p">):</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">location</span><span class="o">=</span><span class="s2">"grpc://0.0.0.0:8815"</span><span class="p">,</span> |
| <span class="n">repo</span><span class="o">=</span><span class="n">pathlib</span><span class="o">.</span><span class="n">Path</span><span class="p">(</span><span class="s2">"./datasets"</span><span class="p">),</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">FlightServer</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">location</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_location</span> <span class="o">=</span> <span class="n">location</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">=</span> <span class="n">repo</span> |
| |
| <span class="k">def</span> <span class="nf">_make_flight_info</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dataset</span><span class="p">):</span> |
| <span class="n">dataset_path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">/</span> <span class="n">dataset</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">parquet</span><span class="o">.</span><span class="n">read_schema</span><span class="p">(</span><span class="n">dataset_path</span><span class="p">)</span> |
| <span class="n">metadata</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">parquet</span><span class="o">.</span><span class="n">read_metadata</span><span class="p">(</span><span class="n">dataset_path</span><span class="p">)</span> |
| <span class="n">descriptor</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightDescriptor</span><span class="o">.</span><span class="n">for_path</span><span class="p">(</span> |
| <span class="n">dataset</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="p">)</span> |
| <span class="n">endpoints</span> <span class="o">=</span> <span class="p">[</span><span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightEndpoint</span><span class="p">(</span><span class="n">dataset</span><span class="p">,</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">_location</span><span class="p">])]</span> |
| <span class="k">return</span> <span class="n">pyarrow</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightInfo</span><span class="p">(</span><span class="n">schema</span><span class="p">,</span> |
| <span class="n">descriptor</span><span class="p">,</span> |
| <span class="n">endpoints</span><span class="p">,</span> |
| <span class="n">metadata</span><span class="o">.</span><span class="n">num_rows</span><span class="p">,</span> |
| <span class="n">metadata</span><span class="o">.</span><span class="n">serialized_size</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">list_flights</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">criteria</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">dataset</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span><span class="o">.</span><span class="n">iterdir</span><span class="p">():</span> |
| <span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">_make_flight_info</span><span class="p">(</span><span class="n">dataset</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">get_flight_info</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">descriptor</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_make_flight_info</span><span class="p">(</span><span class="n">descriptor</span><span class="o">.</span><span class="n">path</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span> |
| |
| <span class="k">def</span> <span class="nf">do_put</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">descriptor</span><span class="p">,</span> <span class="n">reader</span><span class="p">,</span> <span class="n">writer</span><span class="p">):</span> |
| <span class="n">dataset</span> <span class="o">=</span> <span class="n">descriptor</span><span class="o">.</span><span class="n">path</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="n">dataset_path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">/</span> <span class="n">dataset</span> |
| <span class="c1"># Read the uploaded data and write to Parquet incrementally</span> |
| <span class="k">with</span> <span class="n">dataset_path</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="s2">"wb"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sink</span><span class="p">:</span> |
| <span class="k">with</span> <span class="n">pa</span><span class="o">.</span><span class="n">parquet</span><span class="o">.</span><span class="n">ParquetWriter</span><span class="p">(</span><span class="n">sink</span><span class="p">,</span> <span class="n">reader</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span> <span class="k">as</span> <span class="n">writer</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">chunk</span> <span class="ow">in</span> <span class="n">reader</span><span class="p">:</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">write_table</span><span class="p">(</span><span class="n">pa</span><span class="o">.</span><span class="n">Table</span><span class="o">.</span><span class="n">from_batches</span><span class="p">([</span><span class="n">chunk</span><span class="o">.</span><span class="n">data</span><span class="p">]))</span> |
| |
| <span class="k">def</span> <span class="nf">do_get</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">ticket</span><span class="p">):</span> |
| <span class="n">dataset</span> <span class="o">=</span> <span class="n">ticket</span><span class="o">.</span><span class="n">ticket</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span> |
| <span class="c1"># Stream data from a file</span> |
| <span class="n">dataset_path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">/</span> <span class="n">dataset</span> |
| <span class="n">reader</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">parquet</span><span class="o">.</span><span class="n">ParquetFile</span><span class="p">(</span><span class="n">dataset_path</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">GeneratorStream</span><span class="p">(</span> |
| <span class="n">reader</span><span class="o">.</span><span class="n">schema_arrow</span><span class="p">,</span> <span class="n">reader</span><span class="o">.</span><span class="n">iter_batches</span><span class="p">())</span> |
| |
| <span class="k">def</span> <span class="nf">list_actions</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">[</span> |
| <span class="p">(</span><span class="s2">"drop_dataset"</span><span class="p">,</span> <span class="s2">"Delete a dataset."</span><span class="p">),</span> |
| <span class="p">]</span> |
| |
| <span class="k">def</span> <span class="nf">do_action</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">action</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">action</span><span class="o">.</span><span class="n">type</span> <span class="o">==</span> <span class="s2">"drop_dataset"</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">do_drop_dataset</span><span class="p">(</span><span class="n">action</span><span class="o">.</span><span class="n">body</span><span class="o">.</span><span class="n">to_pybytes</span><span class="p">()</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span> |
| |
| <span class="k">def</span> <span class="nf">do_drop_dataset</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dataset</span><span class="p">):</span> |
| <span class="n">dataset_path</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_repo</span> <span class="o">/</span> <span class="n">dataset</span> |
| <span class="n">dataset_path</span><span class="o">.</span><span class="n">unlink</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p>First, we’ve modified <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightServerBase.html#pyarrow.flight.FlightServerBase.do_put" title="(in Apache Arrow v14.0.1)"><code class="xref py py-meth docutils literal notranslate"><span class="pre">pyarrow.flight.FlightServerBase.do_put()</span></code></a>. Instead |
| of reading all the uploaded data into a <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table" title="(in Apache Arrow v14.0.1)"><code class="xref py py-class docutils literal notranslate"><span class="pre">pyarrow.Table</span></code></a> before writing, |
| we instead iterate through each batch as it comes and add it to a Parquet file.</p> |
| <p>Then, we’ve modified <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightServerBase.html#pyarrow.flight.FlightServerBase.do_get" title="(in Apache Arrow v14.0.1)"><code class="xref py py-meth docutils literal notranslate"><span class="pre">pyarrow.flight.FlightServerBase.do_get()</span></code></a> to stream |
| data to the client. This uses <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.GeneratorStream.html#pyarrow.flight.GeneratorStream" title="(in Apache Arrow v14.0.1)"><code class="xref py py-class docutils literal notranslate"><span class="pre">pyarrow.flight.GeneratorStream</span></code></a>, which |
| takes a schema and any iterable or iterator. Flight then iterates through and |
| sends each record batch to the client, allowing us to handle even large Parquet |
| files that don’t fit into memory.</p> |
| <p>While GeneratorStream has the advantage that it can stream data, that means |
| Flight must call back into Python for each record batch to send. In contrast, |
| RecordBatchStream requires that all data is in-memory up front, but once |
| created, all data transfer is handled purely in C++, without needing to call |
| Python code.</p> |
| <p>Let’s give the server a spin. As before, we’ll start the server:</p> |
| <div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">'__main__'</span><span class="p">:</span> |
| <span class="n">server</span> <span class="o">=</span> <span class="n">FlightServer</span><span class="p">()</span> |
| <span class="n">server</span><span class="o">.</span><span class="n">_repo</span><span class="o">.</span><span class="n">mkdir</span><span class="p">(</span><span class="n">exist_ok</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="n">server</span><span class="o">.</span><span class="n">serve</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p>We create a client, and this time, we’ll write batches to the writer, as if we |
| had a stream of data instead of a table in memory:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pyarrow</span> <span class="k">as</span> <span class="nn">pa</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.flight</span> |
| |
| <span class="n">client</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="s2">"grpc://0.0.0.0:8815"</span><span class="p">)</span> |
| |
| <span class="c1"># Upload a new dataset</span> |
| <span class="n">NUM_BATCHES</span> <span class="o">=</span> <span class="mi">1024</span> |
| <span class="n">ROWS_PER_BATCH</span> <span class="o">=</span> <span class="mi">4096</span> |
| <span class="n">upload_descriptor</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightDescriptor</span><span class="o">.</span><span class="n">for_path</span><span class="p">(</span><span class="s2">"streamed.parquet"</span><span class="p">)</span> |
| <span class="n">batch</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">record_batch</span><span class="p">([</span> |
| <span class="n">pa</span><span class="o">.</span><span class="n">array</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="n">ROWS_PER_BATCH</span><span class="p">)),</span> |
| <span class="p">],</span> <span class="n">names</span><span class="o">=</span><span class="p">[</span><span class="s2">"ints"</span><span class="p">])</span> |
| <span class="n">writer</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">do_put</span><span class="p">(</span><span class="n">upload_descriptor</span><span class="p">,</span> <span class="n">batch</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span> |
| <span class="k">with</span> <span class="n">writer</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">NUM_BATCHES</span><span class="p">):</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">write_batch</span><span class="p">(</span><span class="n">batch</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>As before, we can then read it back. Again, we’ll read each batch from the |
| stream as it arrives, instead of reading them all into a table:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># Read content of the dataset</span> |
| <span class="n">flight</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">get_flight_info</span><span class="p">(</span><span class="n">upload_descriptor</span><span class="p">)</span> |
| <span class="n">reader</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">do_get</span><span class="p">(</span><span class="n">flight</span><span class="o">.</span><span class="n">endpoints</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">ticket</span><span class="p">)</span> |
| <span class="n">total_rows</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="k">for</span> <span class="n">chunk</span> <span class="ow">in</span> <span class="n">reader</span><span class="p">:</span> |
| <span class="n">total_rows</span> <span class="o">+=</span> <span class="n">chunk</span><span class="o">.</span><span class="n">data</span><span class="o">.</span><span class="n">num_rows</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Got"</span><span class="p">,</span> <span class="n">total_rows</span><span class="p">,</span> <span class="s2">"rows total, expected"</span><span class="p">,</span> <span class="n">NUM_BATCHES</span> <span class="o">*</span> <span class="n">ROWS_PER_BATCH</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <div class="highlight-none notranslate"><div class="highlight"><pre><span></span>Got 4194304 rows total, expected 4194304 |
| </pre></div> |
| </div> |
| </section> |
| <section id="authentication-with-user-password"> |
| <h2><a class="toc-backref" href="#id4" role="doc-backlink">Authentication with user/password</a><a class="headerlink" href="#authentication-with-user-password" title="Link to this heading">¶</a></h2> |
| <p>Often, services need a way to authenticate the user and identify who |
| they are. Flight provides <a class="reference external" href="https://arrow.apache.org/docs/format/Flight.html" title="(in Apache Arrow v14.0.1)"><span class="xref std std-doc">several ways to implement |
| authentication</span></a>; the simplest uses a |
| user-password scheme. At startup, the client authenticates itself with |
| the server using a username and password. The server returns an |
| authorization token to include on future requests.</p> |
| <div class="admonition warning"> |
| <p class="admonition-title">Warning</p> |
| <p>Authentication should only be used over a secure encrypted |
| channel, i.e. TLS should be enabled.</p> |
| </div> |
| <div class="admonition note"> |
| <p class="admonition-title">Note</p> |
| <p>While the scheme is described as “<a class="reference external" href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme">(HTTP) basic |
| authentication</a>”, it does not actually implement HTTP |
| authentication (RFC 7325) per se.</p> |
| </div> |
| <p>While Flight provides some interfaces to implement such a scheme, the |
| server must provide the actual implementation, as demonstrated |
| below. <strong>The implementation here is not secure and is provided as a |
| minimal example only.</strong></p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">base64</span> |
| <span class="kn">import</span> <span class="nn">secrets</span> |
| |
| <span class="kn">import</span> <span class="nn">pyarrow</span> <span class="k">as</span> <span class="nn">pa</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.flight</span> |
| |
| |
| <span class="k">class</span> <span class="nc">EchoServer</span><span class="p">(</span><span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightServerBase</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""A simple server that just echoes any requests from DoAction."""</span> |
| |
| <span class="k">def</span> <span class="nf">do_action</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">action</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">action</span><span class="o">.</span><span class="n">type</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s2">"utf-8"</span><span class="p">),</span> <span class="n">action</span><span class="o">.</span><span class="n">body</span><span class="p">]</span> |
| |
| |
| <span class="k">class</span> <span class="nc">BasicAuthServerMiddlewareFactory</span><span class="p">(</span><span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">ServerMiddlewareFactory</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> Middleware that implements username-password authentication.</span> |
| |
| <span class="sd"> Parameters</span> |
| <span class="sd"> ----------</span> |
| <span class="sd"> creds: Dict[str, str]</span> |
| <span class="sd"> A dictionary of username-password values to accept.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">creds</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">creds</span> <span class="o">=</span> <span class="n">creds</span> |
| <span class="c1"># Map generated bearer tokens to users</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">tokens</span> <span class="o">=</span> <span class="p">{}</span> |
| |
| <span class="k">def</span> <span class="nf">start_call</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">info</span><span class="p">,</span> <span class="n">headers</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Validate credentials at the start of every call."""</span> |
| <span class="c1"># Search for the authentication header (case-insensitive)</span> |
| <span class="n">auth_header</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">for</span> <span class="n">header</span> <span class="ow">in</span> <span class="n">headers</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">header</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> <span class="o">==</span> <span class="s2">"authorization"</span><span class="p">:</span> |
| <span class="n">auth_header</span> <span class="o">=</span> <span class="n">headers</span><span class="p">[</span><span class="n">header</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span> |
| <span class="k">break</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">auth_header</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightUnauthenticatedError</span><span class="p">(</span><span class="s2">"No credentials supplied"</span><span class="p">)</span> |
| |
| <span class="c1"># The header has the structure "AuthType TokenValue", e.g.</span> |
| <span class="c1"># "Basic <encoded username+password>" or "Bearer <random token>".</span> |
| <span class="n">auth_type</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">value</span> <span class="o">=</span> <span class="n">auth_header</span><span class="o">.</span><span class="n">partition</span><span class="p">(</span><span class="s2">" "</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">auth_type</span> <span class="o">==</span> <span class="s2">"Basic"</span><span class="p">:</span> |
| <span class="c1"># Initial "login". The user provided a username/password</span> |
| <span class="c1"># combination encoded in the same way as HTTP Basic Auth.</span> |
| <span class="n">decoded</span> <span class="o">=</span> <span class="n">base64</span><span class="o">.</span><span class="n">b64decode</span><span class="p">(</span><span class="n">value</span><span class="p">)</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s2">"utf-8"</span><span class="p">)</span> |
| <span class="n">username</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">password</span> <span class="o">=</span> <span class="n">decoded</span><span class="o">.</span><span class="n">partition</span><span class="p">(</span><span class="s1">':'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">password</span> <span class="ow">or</span> <span class="n">password</span> <span class="o">!=</span> <span class="bp">self</span><span class="o">.</span><span class="n">creds</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">username</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightUnauthenticatedError</span><span class="p">(</span><span class="s2">"Unknown user or invalid password"</span><span class="p">)</span> |
| <span class="c1"># Generate a secret, random bearer token for future calls.</span> |
| <span class="n">token</span> <span class="o">=</span> <span class="n">secrets</span><span class="o">.</span><span class="n">token_urlsafe</span><span class="p">(</span><span class="mi">32</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">tokens</span><span class="p">[</span><span class="n">token</span><span class="p">]</span> <span class="o">=</span> <span class="n">username</span> |
| <span class="k">return</span> <span class="n">BasicAuthServerMiddleware</span><span class="p">(</span><span class="n">token</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">auth_type</span> <span class="o">==</span> <span class="s2">"Bearer"</span><span class="p">:</span> |
| <span class="c1"># An actual call. Validate the bearer token.</span> |
| <span class="n">username</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">tokens</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">username</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightUnauthenticatedError</span><span class="p">(</span><span class="s2">"Invalid token"</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">BasicAuthServerMiddleware</span><span class="p">(</span><span class="n">value</span><span class="p">)</span> |
| |
| <span class="k">raise</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightUnauthenticatedError</span><span class="p">(</span><span class="s2">"No credentials supplied"</span><span class="p">)</span> |
| |
| |
| <span class="k">class</span> <span class="nc">BasicAuthServerMiddleware</span><span class="p">(</span><span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">ServerMiddleware</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Middleware that implements username-password authentication."""</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">token</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">token</span> <span class="o">=</span> <span class="n">token</span> |
| |
| <span class="k">def</span> <span class="nf">sending_headers</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""Return the authentication token to the client."""</span> |
| <span class="k">return</span> <span class="p">{</span><span class="s2">"authorization"</span><span class="p">:</span> <span class="sa">f</span><span class="s2">"Bearer </span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">token</span><span class="si">}</span><span class="s2">"</span><span class="p">}</span> |
| |
| |
| <span class="k">class</span> <span class="nc">NoOpAuthHandler</span><span class="p">(</span><span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">ServerAuthHandler</span><span class="p">):</span> |
| <span class="w"> </span><span class="sd">"""</span> |
| <span class="sd"> A handler that implements username-password authentication.</span> |
| |
| <span class="sd"> This is required only so that the server will respond to the internal</span> |
| <span class="sd"> Handshake RPC call, which the client calls when authenticate_basic_token</span> |
| <span class="sd"> is called. Otherwise, it should be a no-op as the actual authentication is</span> |
| <span class="sd"> implemented in middleware.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="nf">authenticate</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">outgoing</span><span class="p">,</span> <span class="n">incoming</span><span class="p">):</span> |
| <span class="k">pass</span> |
| |
| <span class="k">def</span> <span class="nf">is_valid</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">token</span><span class="p">):</span> |
| <span class="k">return</span> <span class="s2">""</span> |
| </pre></div> |
| </div> |
| <p>We can then start the server:</p> |
| <div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">'__main__'</span><span class="p">:</span> |
| <span class="n">server</span> <span class="o">=</span> <span class="n">EchoServer</span><span class="p">(</span> |
| <span class="n">auth_handler</span><span class="o">=</span><span class="n">NoOpAuthHandler</span><span class="p">(),</span> |
| <span class="n">location</span><span class="o">=</span><span class="s2">"grpc://0.0.0.0:8816"</span><span class="p">,</span> |
| <span class="n">middleware</span><span class="o">=</span><span class="p">{</span> |
| <span class="s2">"basic"</span><span class="p">:</span> <span class="n">BasicAuthServerMiddlewareFactory</span><span class="p">({</span> |
| <span class="s2">"test"</span><span class="p">:</span> <span class="s2">"password"</span><span class="p">,</span> |
| <span class="p">})</span> |
| <span class="p">},</span> |
| <span class="p">)</span> |
| <span class="n">server</span><span class="o">.</span><span class="n">serve</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p>Then, we can make a client and log in:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pyarrow</span> <span class="k">as</span> <span class="nn">pa</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.flight</span> |
| |
| <span class="n">client</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="s2">"grpc://0.0.0.0:8816"</span><span class="p">)</span> |
| |
| <span class="n">token_pair</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">authenticate_basic_token</span><span class="p">(</span><span class="sa">b</span><span class="s1">'test'</span><span class="p">,</span> <span class="sa">b</span><span class="s1">'password'</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">token_pair</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <div class="highlight-none notranslate"><div class="highlight"><pre><span></span>(b'authorization', b'Bearer ...') |
| </pre></div> |
| </div> |
| <p>For future calls, we include the authentication token with the call:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">action</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">Action</span><span class="p">(</span><span class="s2">"echo"</span><span class="p">,</span> <span class="sa">b</span><span class="s2">"Hello, world!"</span><span class="p">)</span> |
| <span class="n">options</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightCallOptions</span><span class="p">(</span><span class="n">headers</span><span class="o">=</span><span class="p">[</span><span class="n">token_pair</span><span class="p">])</span> |
| <span class="k">for</span> <span class="n">response</span> <span class="ow">in</span> <span class="n">client</span><span class="o">.</span><span class="n">do_action</span><span class="p">(</span><span class="n">action</span><span class="o">=</span><span class="n">action</span><span class="p">,</span> <span class="n">options</span><span class="o">=</span><span class="n">options</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">response</span><span class="o">.</span><span class="n">body</span><span class="o">.</span><span class="n">to_pybytes</span><span class="p">())</span> |
| </pre></div> |
| </div> |
| <div class="highlight-none notranslate"><div class="highlight"><pre><span></span>b'echo' |
| b'Hello, world!' |
| </pre></div> |
| </div> |
| <p>If we fail to do so, we get an authentication error:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">try</span><span class="p">:</span> |
| <span class="nb">list</span><span class="p">(</span><span class="n">client</span><span class="o">.</span><span class="n">do_action</span><span class="p">(</span><span class="n">action</span><span class="o">=</span><span class="n">action</span><span class="p">))</span> |
| <span class="k">except</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightUnauthenticatedError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Unauthenticated:"</span><span class="p">,</span> <span class="n">e</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="s2">"Expected call to fail"</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <div class="highlight-none notranslate"><div class="highlight"><pre><span></span>Unauthenticated: No credentials supplied. Detail: Unauthenticated |
| </pre></div> |
| </div> |
| <p>Or if we use the wrong credentials on login, we also get an error:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">try</span><span class="p">:</span> |
| <span class="n">client</span><span class="o">.</span><span class="n">authenticate_basic_token</span><span class="p">(</span><span class="sa">b</span><span class="s1">'invalid'</span><span class="p">,</span> <span class="sa">b</span><span class="s1">'password'</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightUnauthenticatedError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Unauthenticated:"</span><span class="p">,</span> <span class="n">e</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">RuntimeError</span><span class="p">(</span><span class="s2">"Expected call to fail"</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <div class="highlight-none notranslate"><div class="highlight"><pre><span></span>Unauthenticated: Unknown user or invalid password. Detail: Unauthenticated |
| </pre></div> |
| </div> |
| </section> |
| <section id="securing-connections-with-tls"> |
| <h2><a class="toc-backref" href="#id5" role="doc-backlink">Securing connections with TLS</a><a class="headerlink" href="#securing-connections-with-tls" title="Link to this heading">¶</a></h2> |
| <p>Following on from the previous scenario where traffic to the server is managed via a username and password, |
| HTTPS (more specifically TLS) communication allows an additional layer of security by encrypting messages |
| between the client and server. This is achieved using certificates. During development, the easiest |
| approach is developing with self-signed certificates. At startup, the server loads the public and private |
| key and the client authenticates the server with the TLS root certificate.</p> |
| <div class="admonition note"> |
| <p class="admonition-title">Note</p> |
| <p>In production environments it is recommended to make use of a certificate signed by a certificate authority.</p> |
| </div> |
| <p><strong>Step 1 - Generating the Self Signed Certificate</strong></p> |
| <p>Generate a self-signed certificate by using dotnet on <a class="reference external" href="https://docs.microsoft.com/en-us/dotnet/core/additional-tools/self-signed-certificates-guide">Windows</a>, or <a class="reference external" href="https://www.ibm.com/docs/en/api-connect/2018.x?topic=overview-generating-self-signed-certificate-using-openssl">openssl</a> on Linux or MacOS. |
| Alternatively, the self-signed certificate from the <a class="reference external" href="https://github.com/apache/arrow-testing/tree/master/data/flight">Arrow testing data repository</a> can be used. |
| Depending on the file generated, you may need to convert it to a .crt and .key file as required for the Arrow server. |
| One method to achieve this is openssl, please visit this <a class="reference external" href="https://www.ibm.com/docs/en/arl/9.7?topic=certification-extracting-certificate-keys-from-pfx-file">IBM article</a> for more info.</p> |
| <p><strong>Step 2 - Running a server with TLS enabled</strong></p> |
| <p>The code below is a minimal working example of an Arrow server used to receive data with TLS.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">argparse</span> |
| <span class="kn">import</span> <span class="nn">pyarrow</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.flight</span> |
| |
| |
| <span class="k">class</span> <span class="nc">FlightServer</span><span class="p">(</span><span class="n">pyarrow</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightServerBase</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">host</span><span class="o">=</span><span class="s2">"localhost"</span><span class="p">,</span> <span class="n">location</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">tls_certificates</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">verify_client</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">root_certificates</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">auth_handler</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">FlightServer</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span> |
| <span class="n">location</span><span class="p">,</span> <span class="n">auth_handler</span><span class="p">,</span> <span class="n">tls_certificates</span><span class="p">,</span> <span class="n">verify_client</span><span class="p">,</span> |
| <span class="n">root_certificates</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">flights</span> <span class="o">=</span> <span class="p">{}</span> |
| |
| <span class="nd">@classmethod</span> |
| <span class="k">def</span> <span class="nf">descriptor_to_key</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">descriptor</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">(</span><span class="n">descriptor</span><span class="o">.</span><span class="n">descriptor_type</span><span class="o">.</span><span class="n">value</span><span class="p">,</span> <span class="n">descriptor</span><span class="o">.</span><span class="n">command</span><span class="p">,</span> |
| <span class="nb">tuple</span><span class="p">(</span><span class="n">descriptor</span><span class="o">.</span><span class="n">path</span> <span class="ow">or</span> <span class="nb">tuple</span><span class="p">()))</span> |
| |
| <span class="k">def</span> <span class="nf">do_put</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">descriptor</span><span class="p">,</span> <span class="n">reader</span><span class="p">,</span> <span class="n">writer</span><span class="p">):</span> |
| <span class="n">key</span> <span class="o">=</span> <span class="n">FlightServer</span><span class="o">.</span><span class="n">descriptor_to_key</span><span class="p">(</span><span class="n">descriptor</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">key</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">flights</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">reader</span><span class="o">.</span><span class="n">read_all</span><span class="p">()</span> |
| <span class="nb">print</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">flights</span><span class="p">[</span><span class="n">key</span><span class="p">])</span> |
| |
| |
| <span class="k">def</span> <span class="nf">main</span><span class="p">():</span> |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">argparse</span><span class="o">.</span><span class="n">ArgumentParser</span><span class="p">()</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s2">"--tls"</span><span class="p">,</span> <span class="n">nargs</span><span class="o">=</span><span class="mi">2</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">metavar</span><span class="o">=</span><span class="p">(</span><span class="s1">'CERTFILE'</span><span class="p">,</span> <span class="s1">'KEYFILE'</span><span class="p">))</span> |
| <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| <span class="n">tls_certificates</span> <span class="o">=</span> <span class="p">[]</span> |
| |
| <span class="n">scheme</span> <span class="o">=</span> <span class="s2">"grpc+tls"</span> |
| <span class="n">host</span> <span class="o">=</span> <span class="s2">"localhost"</span> |
| <span class="n">port</span> <span class="o">=</span> <span class="s2">"5005"</span> |
| |
| <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">args</span><span class="o">.</span><span class="n">tls</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="s2">"rb"</span><span class="p">)</span> <span class="k">as</span> <span class="n">cert_file</span><span class="p">:</span> |
| <span class="n">tls_cert_chain</span> <span class="o">=</span> <span class="n">cert_file</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> |
| <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">args</span><span class="o">.</span><span class="n">tls</span><span class="p">[</span><span class="mi">1</span><span class="p">],</span> <span class="s2">"rb"</span><span class="p">)</span> <span class="k">as</span> <span class="n">key_file</span><span class="p">:</span> |
| <span class="n">tls_private_key</span> <span class="o">=</span> <span class="n">key_file</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> |
| |
| <span class="n">tls_certificates</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="n">tls_cert_chain</span><span class="p">,</span> <span class="n">tls_private_key</span><span class="p">))</span> |
| |
| <span class="n">location</span> <span class="o">=</span> <span class="s2">"</span><span class="si">{}</span><span class="s2">://</span><span class="si">{}</span><span class="s2">:</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">scheme</span><span class="p">,</span> <span class="n">host</span><span class="p">,</span> <span class="n">port</span><span class="p">)</span> |
| |
| <span class="n">server</span> <span class="o">=</span> <span class="n">FlightServer</span><span class="p">(</span><span class="n">host</span><span class="p">,</span> <span class="n">location</span><span class="p">,</span> |
| <span class="n">tls_certificates</span><span class="o">=</span><span class="n">tls_certificates</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Serving on"</span><span class="p">,</span> <span class="n">location</span><span class="p">)</span> |
| <span class="n">server</span><span class="o">.</span><span class="n">serve</span><span class="p">()</span> |
| |
| |
| <span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">'__main__'</span><span class="p">:</span> |
| <span class="n">main</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p>Running the server, you should see <code class="docutils literal notranslate"><span class="pre">Serving</span> <span class="pre">on</span> <span class="pre">grpc+tls://localhost:5005</span></code>.</p> |
| <p><strong>Step 3 - Securely Connecting to the Server</strong> |
| Suppose we want to connect to the client and push some data to it. The following code securely sends information to the server using TLS encryption.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">argparse</span> |
| <span class="kn">import</span> <span class="nn">pyarrow</span> |
| <span class="kn">import</span> <span class="nn">pyarrow.flight</span> |
| <span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span> |
| |
| <span class="c1"># Assumes incoming data object is a Pandas Dataframe</span> |
| <span class="k">def</span> <span class="nf">push_to_server</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">data</span><span class="p">,</span> <span class="n">client</span><span class="p">):</span> |
| <span class="n">object_to_send</span> <span class="o">=</span> <span class="n">pyarrow</span><span class="o">.</span><span class="n">Table</span><span class="o">.</span><span class="n">from_pandas</span><span class="p">(</span><span class="n">data</span><span class="p">)</span> |
| <span class="n">writer</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">do_put</span><span class="p">(</span><span class="n">pyarrow</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightDescriptor</span><span class="o">.</span><span class="n">for_path</span><span class="p">(</span><span class="n">name</span><span class="p">),</span> <span class="n">object_to_send</span><span class="o">.</span><span class="n">schema</span><span class="p">)</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">write_table</span><span class="p">(</span><span class="n">object_to_send</span><span class="p">)</span> |
| <span class="n">writer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">main</span><span class="p">():</span> |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">argparse</span><span class="o">.</span><span class="n">ArgumentParser</span><span class="p">()</span> |
| |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">'--tls-roots'</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Path to trusted TLS certificate(s)'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">'--host'</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">"localhost"</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Host endpoint'</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_argument</span><span class="p">(</span><span class="s1">'--port'</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">5005</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s1">'Host port'</span><span class="p">)</span> |
| <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| <span class="n">kwargs</span> <span class="o">=</span> <span class="p">{}</span> |
| |
| <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">args</span><span class="o">.</span><span class="n">tls_roots</span><span class="p">,</span> <span class="s2">"rb"</span><span class="p">)</span> <span class="k">as</span> <span class="n">root_certs</span><span class="p">:</span> |
| <span class="n">kwargs</span><span class="p">[</span><span class="s2">"tls_root_certs"</span><span class="p">]</span> <span class="o">=</span> <span class="n">root_certs</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> |
| |
| <span class="n">client</span> <span class="o">=</span> <span class="n">pyarrow</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">FlightClient</span><span class="p">(</span><span class="sa">f</span><span class="s2">"grpc+tls://</span><span class="si">{</span><span class="n">args</span><span class="o">.</span><span class="n">host</span><span class="si">}</span><span class="s2">:</span><span class="si">{</span><span class="n">args</span><span class="o">.</span><span class="n">port</span><span class="si">}</span><span class="s2">"</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="p">{</span><span class="s1">'Animal'</span><span class="p">:</span> <span class="p">[</span><span class="s1">'Dog'</span><span class="p">,</span> <span class="s1">'Cat'</span><span class="p">,</span> <span class="s1">'Mouse'</span><span class="p">],</span> <span class="s1">'Size'</span><span class="p">:</span> <span class="p">[</span><span class="s1">'Big'</span><span class="p">,</span> <span class="s1">'Small'</span><span class="p">,</span> <span class="s1">'Tiny'</span><span class="p">]}</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">data</span><span class="p">,</span> <span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s1">'Animal'</span><span class="p">,</span> <span class="s1">'Size'</span><span class="p">])</span> |
| <span class="n">push_to_server</span><span class="p">(</span><span class="s2">"AnimalData"</span><span class="p">,</span> <span class="n">df</span><span class="p">,</span> <span class="n">client</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">'__main__'</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">main</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>Running the client script, you should see the server printing out information about the data it just received.</p> |
| </section> |
| <section id="propagating-opentelemetry-traces"> |
| <h2><a class="toc-backref" href="#id6" role="doc-backlink">Propagating OpenTelemetry Traces</a><a class="headerlink" href="#propagating-opentelemetry-traces" title="Link to this heading">¶</a></h2> |
| <p>Distributed tracing with <a class="reference external" href="https://opentelemetry.io/docs/instrumentation/python/getting-started/">OpenTelemetry</a> allows collecting call-level performance |
| measurements across a Flight service. In order to correlate spans across a Flight |
| client and server, trace context must be passed between the two. This can be passed |
| manually through headers in <a class="reference external" href="https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightCallOptions.html#pyarrow.flight.FlightCallOptions" title="(in Apache Arrow v14.0.1)"><code class="xref py py-class docutils literal notranslate"><span class="pre">pyarrow.flight.FlightCallOptions</span></code></a>, or can |
| be automatically propagated using middleware.</p> |
| <p>This example shows how to accomplish trace propagation through middleware. |
| The client middleware needs to inject the trace context into the call headers. |
| The server middleware needs to extract the trace context from the headers and |
| pass the context into a new span. Optionally, the client middleware can also |
| create a new span to time the client-side call.</p> |
| <p><strong>Step 1: define the client middleware:</strong></p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pyarrow.flight</span> <span class="k">as</span> <span class="nn">flight</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry</span> <span class="kn">import</span> <span class="n">trace</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry.propagate</span> <span class="kn">import</span> <span class="n">inject</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry.trace.status</span> <span class="kn">import</span> <span class="n">StatusCode</span> |
| |
| <span class="k">class</span> <span class="nc">ClientTracingMiddlewareFactory</span><span class="p">(</span><span class="n">flight</span><span class="o">.</span><span class="n">ClientMiddlewareFactory</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_tracer</span> <span class="o">=</span> <span class="n">trace</span><span class="o">.</span><span class="n">get_tracer</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">start_call</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">info</span><span class="p">):</span> |
| <span class="n">span</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tracer</span><span class="o">.</span><span class="n">start_span</span><span class="p">(</span><span class="sa">f</span><span class="s2">"client.</span><span class="si">{</span><span class="n">info</span><span class="o">.</span><span class="n">method</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">ClientTracingMiddleware</span><span class="p">(</span><span class="n">span</span><span class="p">)</span> |
| |
| <span class="k">class</span> <span class="nc">ClientTracingMiddleware</span><span class="p">(</span><span class="n">flight</span><span class="o">.</span><span class="n">ClientMiddleware</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">span</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span> <span class="o">=</span> <span class="n">span</span> |
| |
| <span class="k">def</span> <span class="nf">sending_headers</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">ctx</span> <span class="o">=</span> <span class="n">trace</span><span class="o">.</span><span class="n">set_span_in_context</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_span</span><span class="p">)</span> |
| <span class="n">carrier</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">inject</span><span class="p">(</span><span class="n">carrier</span><span class="o">=</span><span class="n">carrier</span><span class="p">,</span> <span class="n">context</span><span class="o">=</span><span class="n">ctx</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">carrier</span> |
| |
| <span class="k">def</span> <span class="nf">call_completed</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exception</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">exception</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span><span class="o">.</span><span class="n">record_exception</span><span class="p">(</span><span class="n">exception</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span><span class="o">.</span><span class="n">set_status</span><span class="p">(</span><span class="n">StatusCode</span><span class="o">.</span><span class="n">ERROR</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">exception</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span><span class="o">.</span><span class="n">set_status</span><span class="p">(</span><span class="n">StatusCode</span><span class="o">.</span><span class="n">OK</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span><span class="o">.</span><span class="n">end</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p><strong>Step 2: define the server middleware:</strong></p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">pyarrow.flight</span> <span class="k">as</span> <span class="nn">flight</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry</span> <span class="kn">import</span> <span class="n">trace</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry.propagate</span> <span class="kn">import</span> <span class="n">extract</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry.trace.status</span> <span class="kn">import</span> <span class="n">StatusCode</span> |
| |
| <span class="k">class</span> <span class="nc">ServerTracingMiddlewareFactory</span><span class="p">(</span><span class="n">flight</span><span class="o">.</span><span class="n">ServerMiddlewareFactory</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_tracer</span> <span class="o">=</span> <span class="n">trace</span><span class="o">.</span><span class="n">get_tracer</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">start_call</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">info</span><span class="p">,</span> <span class="n">headers</span><span class="p">):</span> |
| <span class="n">context</span> <span class="o">=</span> <span class="n">extract</span><span class="p">(</span><span class="n">headers</span><span class="p">)</span> |
| <span class="n">span</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_tracer</span><span class="o">.</span><span class="n">start_span</span><span class="p">(</span><span class="sa">f</span><span class="s2">"server.</span><span class="si">{</span><span class="n">info</span><span class="o">.</span><span class="n">method</span><span class="si">}</span><span class="s2">"</span><span class="p">,</span> <span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">ServerTracingMiddleware</span><span class="p">(</span><span class="n">span</span><span class="p">)</span> |
| |
| <span class="k">class</span> <span class="nc">ServerTracingMiddleware</span><span class="p">(</span><span class="n">flight</span><span class="o">.</span><span class="n">ServerMiddleware</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">span</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span> <span class="o">=</span> <span class="n">span</span> |
| |
| <span class="k">def</span> <span class="nf">call_completed</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exception</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">exception</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span><span class="o">.</span><span class="n">record_exception</span><span class="p">(</span><span class="n">exception</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span><span class="o">.</span><span class="n">set_status</span><span class="p">(</span><span class="n">StatusCode</span><span class="o">.</span><span class="n">ERROR</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">exception</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span><span class="o">.</span><span class="n">set_status</span><span class="p">(</span><span class="n">StatusCode</span><span class="o">.</span><span class="n">OK</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_span</span><span class="o">.</span><span class="n">end</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p><strong>Step 3: configure the trace exporter, processor, and provider:</strong></p> |
| <p>Both the server and client will need to be configured with the OpenTelemetry SDK |
| to record spans and export them somewhere. For the sake of the example, we’ll |
| collect the spans into a Python list, but this is normally where you would set |
| them up to be exported to some service like <a class="reference external" href="https://www.jaegertracing.io/">Jaeger</a>. See other examples of |
| exporters at <a class="reference external" href="https://opentelemetry.io/docs/instrumentation/python/exporters/">OpenTelemetry Exporters</a>.</p> |
| <p>As part of this, you will need to define the resource where spans are running. |
| At a minimum this is the service name, but it could include other information like |
| a hostname, process id, service version, and operating system.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">opentelemetry</span> <span class="kn">import</span> <span class="n">trace</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry.sdk.trace</span> <span class="kn">import</span> <span class="n">TracerProvider</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry.sdk.trace.export</span> <span class="kn">import</span> <span class="n">SimpleSpanProcessor</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry.sdk.resources</span> <span class="kn">import</span> <span class="n">SERVICE_NAME</span><span class="p">,</span> <span class="n">Resource</span> |
| <span class="kn">from</span> <span class="nn">opentelemetry.sdk.trace.export</span> <span class="kn">import</span> <span class="n">SpanExporter</span><span class="p">,</span> <span class="n">SpanExportResult</span> |
| |
| <span class="k">class</span> <span class="nc">TestSpanExporter</span><span class="p">(</span><span class="n">SpanExporter</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">spans</span> <span class="o">=</span> <span class="p">[]</span> |
| |
| <span class="k">def</span> <span class="nf">export</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">spans</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">spans</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">spans</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">SpanExportResult</span><span class="o">.</span><span class="n">SUCCESS</span> |
| |
| <span class="k">def</span> <span class="nf">configure_tracing</span><span class="p">():</span> |
| <span class="c1"># Service name is required for most backends,</span> |
| <span class="c1"># and although it's not necessary for console export,</span> |
| <span class="c1"># it's good to set service name anyways.</span> |
| <span class="n">resource</span> <span class="o">=</span> <span class="n">Resource</span><span class="p">(</span><span class="n">attributes</span><span class="o">=</span><span class="p">{</span> |
| <span class="n">SERVICE_NAME</span><span class="p">:</span> <span class="s2">"my-service"</span> |
| <span class="p">})</span> |
| <span class="n">exporter</span> <span class="o">=</span> <span class="n">TestSpanExporter</span><span class="p">()</span> |
| <span class="n">provider</span> <span class="o">=</span> <span class="n">TracerProvider</span><span class="p">(</span><span class="n">resource</span><span class="o">=</span><span class="n">resource</span><span class="p">)</span> |
| <span class="n">processor</span> <span class="o">=</span> <span class="n">SimpleSpanProcessor</span><span class="p">(</span><span class="n">exporter</span><span class="p">)</span> |
| <span class="n">provider</span><span class="o">.</span><span class="n">add_span_processor</span><span class="p">(</span><span class="n">processor</span><span class="p">)</span> |
| <span class="n">trace</span><span class="o">.</span><span class="n">set_tracer_provider</span><span class="p">(</span><span class="n">provider</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">exporter</span> |
| </pre></div> |
| </div> |
| <p><strong>Step 4: add the middleware to the server:</strong></p> |
| <p>We can use the middleware now in our EchoServer from earlier.</p> |
| <div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">'__main__'</span><span class="p">:</span> |
| <span class="n">exporter</span> <span class="o">=</span> <span class="n">configure_tracing</span><span class="p">()</span> |
| <span class="n">server</span> <span class="o">=</span> <span class="n">EchoServer</span><span class="p">(</span> |
| <span class="n">location</span><span class="o">=</span><span class="s2">"grpc://0.0.0.0:8816"</span><span class="p">,</span> |
| <span class="n">middleware</span><span class="o">=</span><span class="p">{</span> |
| <span class="s2">"tracing"</span><span class="p">:</span> <span class="n">ServerTracingMiddlewareFactory</span><span class="p">()</span> |
| <span class="p">},</span> |
| <span class="p">)</span> |
| <span class="n">server</span><span class="o">.</span><span class="n">serve</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p><strong>Step 5: add the middleware to the client:</strong></p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">client</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span> |
| <span class="s2">"grpc://0.0.0.0:8816"</span><span class="p">,</span> |
| <span class="n">middleware</span><span class="o">=</span><span class="p">[</span><span class="n">ClientTracingMiddlewareFactory</span><span class="p">()],</span> |
| <span class="p">)</span> |
| </pre></div> |
| </div> |
| <p><strong>Step 6: use the client within active spans:</strong></p> |
| <p>When we make a call with our client within an OpenTelemetry span, our client |
| middleware will create a child span for the client-side Flight call and then |
| propagate the span context to the server. Our server middleware will pick up |
| that trace context and create another child span.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">opentelemetry</span> <span class="kn">import</span> <span class="n">trace</span> |
| |
| <span class="c1"># Client would normally also need to configure tracing, but for this example</span> |
| <span class="c1"># the client and server are running in the same Python process.</span> |
| <span class="c1"># exporter = configure_tracing()</span> |
| |
| <span class="n">tracer</span> <span class="o">=</span> <span class="n">trace</span><span class="o">.</span><span class="n">get_tracer</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> |
| |
| <span class="k">with</span> <span class="n">tracer</span><span class="o">.</span><span class="n">start_as_current_span</span><span class="p">(</span><span class="s2">"hello_world"</span><span class="p">)</span> <span class="k">as</span> <span class="n">span</span><span class="p">:</span> |
| <span class="n">action</span> <span class="o">=</span> <span class="n">pa</span><span class="o">.</span><span class="n">flight</span><span class="o">.</span><span class="n">Action</span><span class="p">(</span><span class="s2">"echo"</span><span class="p">,</span> <span class="sa">b</span><span class="s2">"Hello, world!"</span><span class="p">)</span> |
| <span class="c1"># Call list() on do_action to drain all results.</span> |
| <span class="nb">list</span><span class="p">(</span><span class="n">client</span><span class="o">.</span><span class="n">do_action</span><span class="p">(</span><span class="n">action</span><span class="o">=</span><span class="n">action</span><span class="p">))</span> |
| |
| <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"There are </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">exporter</span><span class="o">.</span><span class="n">spans</span><span class="p">)</span><span class="si">}</span><span class="s2"> spans."</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"The span names are:</span><span class="se">\n</span><span class="s2"> </span><span class="si">{</span><span class="nb">list</span><span class="p">(</span><span class="n">span</span><span class="o">.</span><span class="n">name</span><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="n">span</span><span class="w"> </span><span class="ow">in</span><span class="w"> </span><span class="n">exporter</span><span class="o">.</span><span class="n">spans</span><span class="p">)</span><span class="si">}</span><span class="s2">."</span><span class="p">)</span> |
| <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"The span status codes are:</span><span class="se">\n</span><span class="s2"> "</span> |
| <span class="sa">f</span><span class="s2">"</span><span class="si">{</span><span class="nb">list</span><span class="p">(</span><span class="n">span</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">status_code</span><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="n">span</span><span class="w"> </span><span class="ow">in</span><span class="w"> </span><span class="n">exporter</span><span class="o">.</span><span class="n">spans</span><span class="p">)</span><span class="si">}</span><span class="s2">."</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <div class="highlight-none notranslate"><div class="highlight"><pre><span></span>There are 3 spans. |
| The span names are: |
| ['server.FlightMethod.DO_ACTION', 'client.FlightMethod.DO_ACTION', 'hello_world']. |
| The span status codes are: |
| [<StatusCode.OK: 1>, <StatusCode.OK: 1>, <StatusCode.UNSET: 0>]. |
| </pre></div> |
| </div> |
| <p>As expected, we have three spans: one in our client code, one in the client |
| middleware, and one in the server middleware.</p> |
| </section> |
| </section> |
| |
| |
| </div> |
| |
| </div> |
| </div> |
| <div class="sphinxsidebar" role="navigation" aria-label="main navigation"> |
| <div class="sphinxsidebarwrapper"> |
| <p class="logo"> |
| <a href="index.html"> |
| <img class="logo" src="_static/arrow-logo_vertical_black-txt_transparent-bg.svg" alt="Logo"/> |
| |
| </a> |
| </p> |
| |
| |
| |
| |
| |
| |
| <p> |
| <iframe src="https://ghbtns.com/github-btn.html?user=apache&repo=arrow-cookbook&type=none&count=true&size=large&v=2" |
| allowtransparency="true" frameborder="0" scrolling="0" width="200px" height="35px"></iframe> |
| </p> |
| |
| |
| |
| |
| |
| <h3>Navigation</h3> |
| <p class="caption" role="heading"><span class="caption-text">Contents:</span></p> |
| <ul class="current"> |
| <li class="toctree-l1"><a class="reference internal" href="io.html">Reading and Writing Data</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="create.html">Creating Arrow Objects</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="schema.html">Working with Schema</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="data.html">Data Manipulation</a></li> |
| <li class="toctree-l1 current"><a class="current reference internal" href="#">Arrow Flight</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="#simple-parquet-storage-service-with-arrow-flight">Simple Parquet storage service with Arrow Flight</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="#streaming-parquet-storage-service">Streaming Parquet Storage Service</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="#authentication-with-user-password">Authentication with user/password</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="#securing-connections-with-tls">Securing connections with TLS</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="#propagating-opentelemetry-traces">Propagating OpenTelemetry Traces</a></li> |
| </ul> |
| </li> |
| </ul> |
| |
| |
| <hr /> |
| <ul> |
| |
| <li class="toctree-l1"><a href="https://arrow.apache.org/docs/python/index.html">User Guide</a></li> |
| |
| <li class="toctree-l1"><a href="https://arrow.apache.org/docs/python/api.html">API Reference</a></li> |
| |
| </ul> |
| <div class="relations"> |
| <h3>Related Topics</h3> |
| <ul> |
| <li><a href="index.html">Documentation overview</a><ul> |
| <li>Previous: <a href="data.html" title="previous chapter">Data Manipulation</a></li> |
| </ul></li> |
| </ul> |
| </div> |
| <div id="searchbox" style="display: none" role="search"> |
| <h3 id="searchlabel">Quick search</h3> |
| <div class="searchformwrapper"> |
| <form class="search" action="search.html" method="get"> |
| <input type="text" name="q" aria-labelledby="searchlabel" autocomplete="off" autocorrect="off" autocapitalize="off" spellcheck="false"/> |
| <input type="submit" value="Go" /> |
| </form> |
| </div> |
| </div> |
| <script>document.getElementById('searchbox').style.display = "block"</script> |
| |
| |
| |
| |
| |
| |
| |
| |
| </div> |
| </div> |
| <div class="clearer"></div> |
| </div> |
| <div class="footer"> |
| ©2022, Apache Software Foundation. |
| |
| | |
| Powered by <a href="http://sphinx-doc.org/">Sphinx 7.2.6</a> |
| & <a href="https://github.com/bitprophet/alabaster">Alabaster 0.7.13</a> |
| |
| | |
| <a href="_sources/flight.rst.txt" |
| rel="nofollow">Page source</a> |
| </div> |
| |
| |
| |
| |
| </body> |
| </html> |