blob: dcf2352dd72bec17dc526be12db05306055cf6f8 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2020/07/23/sharing-is-caring-catalogs-in-flink-sql/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="With an ever-growing number of people working with data, it&rsquo;s a common practice for companies to build self-service platforms with the goal of democratizing their access across different teams and — especially — to enable users from any background to be independent in their data needs. In such environments, metadata management becomes a crucial aspect. Without it, users often work blindly, spending too much time searching for datasets and their location, figuring out data formats and similar cumbersome tasks.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Sharing is caring - Catalogs in Flink SQL" />
<meta property="og:description" content="With an ever-growing number of people working with data, it&rsquo;s a common practice for companies to build self-service platforms with the goal of democratizing their access across different teams and — especially — to enable users from any background to be independent in their data needs. In such environments, metadata management becomes a crucial aspect. Without it, users often work blindly, spending too much time searching for datasets and their location, figuring out data formats and similar cumbersome tasks." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2020/07/23/sharing-is-caring-catalogs-in-flink-sql/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2020-07-23T08:00:00+00:00" />
<meta property="article:modified_time" content="2020-07-23T08:00:00+00:00" />
<title>Sharing is caring - Catalogs in Flink SQL | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2020/07/23/sharing-is-caring-catalogs-in-flink-sql/">Sharing is caring - Catalogs in Flink SQL</a>
</h1>
July 23, 2020 -
Dawid Wysakowicz
<a href="https://twitter.com/dwysakowicz">(@dwysakowicz)</a>
<p><p>With an ever-growing number of people working with data, it&rsquo;s a common practice for companies to build self-service platforms with the goal of democratizing their access across different teams and — especially — to enable users from any background to be independent in their data needs. In such environments, metadata management becomes a crucial aspect. Without it, users often work blindly, spending too much time searching for datasets and their location, figuring out data formats and similar cumbersome tasks.</p>
<p>In this blog post, we want to give you a high level overview of catalogs in Flink. We&rsquo;ll describe why you should consider using them and what you can achieve with one in place. To round it up, we&rsquo;ll also showcase how simple it is to combine catalogs and Flink, in the form of an end-to-end example that you can try out yourself.</p>
<h2 id="why-do-i-need-a-catalog">
Why do I need a catalog?
<a class="anchor" href="#why-do-i-need-a-catalog">#</a>
</h2>
<p>Frequently, companies start building a data platform with a metastore, catalog, or schema registry of some sort already in place. Those let you clearly separate making the data available from consuming it. That separation has a few benefits:</p>
<ul>
<li><strong>Improved productivity</strong> - The most obvious one. Making data reusable and shifting the focus on building new models/pipelines rather than data cleansing and discovery.</li>
<li><strong>Security</strong> - You can control the access to certain features of the data. For example, you can make the schema of the dataset publicly available, but limit the actual access to the underlying data only to particular teams.</li>
<li><strong>Compliance</strong> - If you have all the metadata in a central entity, it&rsquo;s much easier to ensure compliance with GDPR and similar regulations and legal requirements.</li>
</ul>
<h2 id="what-is-stored-in-a-catalog">
What is stored in a catalog?
<a class="anchor" href="#what-is-stored-in-a-catalog">#</a>
</h2>
<p>Almost all data sets can be described by certain properties that must be known in order to consume them. Those include:</p>
<ul>
<li>
<p><strong>Schema</strong> - It describes the actual contents of the data, what columns it has, what are the constraints (e.g. keys) on which the updates should be performed, which fields can act as time attributes, what are the rules for watermark generation and so on.</p>
</li>
<li>
<p><strong>Location</strong> - Does the data come from Kafka or a file in a filesystem? How do you connect to the external system? Which topic or file name do you use?</p>
</li>
<li>
<p><strong>Format</strong> - Is the data serialized as JSON, CSV, or maybe Avro records?</p>
</li>
<li>
<p><strong>Statistics</strong> - You can also store additional information that can be useful when creating an execution plan of your query. For example, you can choose the best join algorithm, based on the number of rows in joined datasets.</p>
</li>
</ul>
<p>Catalogs don’t have to be limited to the metadata of datasets. You can usually store other objects that can be reused in different scenarios, such as:</p>
<ul>
<li>
<p><strong>Functions</strong> - It&rsquo;s very common to have domain specific functions that can be helpful in different use cases. Instead of having to create them in each place separately, you can just create them once and share them with others.</p>
</li>
<li>
<p><strong>Queries</strong> - Those can be useful when you don’t want to persist a data set, but want to provide a recipe for creating it from other sources instead.</p>
</li>
</ul>
<h2 id="catalogs-support-in-flink-sql">
Catalogs support in Flink SQL
<a class="anchor" href="#catalogs-support-in-flink-sql">#</a>
</h2>
<p>Starting from version 1.9, Flink has a set of Catalog APIs that allows to integrate Flink with various catalog implementations. With the help of those APIs, you can query tables in Flink that were created in your external catalogs (e.g. Hive Metastore). Additionally, depending on the catalog implementation, you can create new objects such as tables or views from Flink, reuse them across different jobs, and possibly even use them in other tools compatible with that catalog. In other words, you can see catalogs as having a two-fold purpose:</p>
<ul>
<li>
<p>Provide an out-of-the box integration with ecosystems such as RDBMSs or Hive that allows you to query external objects like tables, views, or functions with no additional connector configuration. The connector properties are automatically derived from the catalog itself.</p>
</li>
<li>
<p>Act as a persistent store for Flink-specific metadata. In this mode, we additionally store connector properties alongside the logical metadata (e.g. schema, object name). That approach enables you to, for example, store a full definition of a Kafka-backed table with records serialized with Avro in Hive that can be later on used by Flink. However, as it incorporates Flink-specific properties, it can not be used by other tools that leverage Hive Metastore.</p>
</li>
</ul>
<p>As of Flink 1.11, there are two catalog implementations supported by the community:</p>
<ol>
<li>
<p>A comprehensive Hive catalog</p>
</li>
<li>
<p>A Postgres catalog (preview, read-only, for now)</p>
</li>
</ol>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Flink does not store data at rest; it is a compute engine and requires other systems to consume input from and write its output. This means that Flink does not own the lifecycle of the data. Integration with Catalogs does not change that. Flink uses catalogs for metadata management only.
</div>
<p>All you need to do to start querying your tables defined in either of these metastores is to create the corresponding catalogs with connection parameters. Once this is done, you can use them the way you would in any relational database management system.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="c1">-- create a catalog which gives access to the backing Postgres installation
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">CREATE</span><span class="w"> </span><span class="k">CATALOG</span><span class="w"> </span><span class="n">postgres</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;type&#39;</span><span class="o">=</span><span class="s1">&#39;jdbc&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;property-version&#39;</span><span class="o">=</span><span class="s1">&#39;1&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;base-url&#39;</span><span class="o">=</span><span class="s1">&#39;jdbc:postgresql://postgres:5432/&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;default-database&#39;</span><span class="o">=</span><span class="s1">&#39;postgres&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;username&#39;</span><span class="o">=</span><span class="s1">&#39;postgres&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;password&#39;</span><span class="o">=</span><span class="s1">&#39;example&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- create a catalog which gives access to the backing Hive installation
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">CREATE</span><span class="w"> </span><span class="k">CATALOG</span><span class="w"> </span><span class="n">hive</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;type&#39;</span><span class="o">=</span><span class="s1">&#39;hive&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;property-version&#39;</span><span class="o">=</span><span class="s1">&#39;1&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;hive-version&#39;</span><span class="o">=</span><span class="s1">&#39;2.3.6&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;hive-conf-dir&#39;</span><span class="o">=</span><span class="s1">&#39;/opt/hive-conf&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>After creating the catalogs, you can confirm that they are available to Flink and also list the databases or tables in each of these catalogs:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="o">&gt;</span><span class="w"> </span><span class="k">show</span><span class="w"> </span><span class="n">catalogs</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">default_catalog</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">hive</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">postgres</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- switch the default catalog to Hive
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="o">&gt;</span><span class="w"> </span><span class="n">use</span><span class="w"> </span><span class="k">catalog</span><span class="w"> </span><span class="n">hive</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="o">&gt;</span><span class="w"> </span><span class="k">show</span><span class="w"> </span><span class="n">databases</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">default</span><span class="w"> </span><span class="c1">-- hive&#39;s default database
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="o">&gt;</span><span class="w"> </span><span class="k">show</span><span class="w"> </span><span class="n">tables</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">dev_orders</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="o">&gt;</span><span class="w"> </span><span class="n">use</span><span class="w"> </span><span class="k">catalog</span><span class="w"> </span><span class="n">postgres</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="o">&gt;</span><span class="w"> </span><span class="k">show</span><span class="w"> </span><span class="n">tables</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">prod_customer</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">prod_nation</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">prod_rates</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">prod_region</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">region_stats</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- desribe a schema of a table in Postgres, the Postgres types are automatically mapped to
</span></span></span><span class="line"><span class="cl"><span class="c1">-- Flink&#39;s type system
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="o">&gt;</span><span class="w"> </span><span class="k">describe</span><span class="w"> </span><span class="n">prod_customer</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">root</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="o">|</span><span class="c1">-- c_custkey: INT NOT NULL
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="o">|</span><span class="c1">-- c_name: VARCHAR(25) NOT NULL
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="o">|</span><span class="c1">-- c_address: VARCHAR(40) NOT NULL
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="o">|</span><span class="c1">-- c_nationkey: INT NOT NULL
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="o">|</span><span class="c1">-- c_phone: CHAR(15) NOT NULL
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="o">|</span><span class="c1">-- c_acctbal: DOUBLE NOT NULL
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="o">|</span><span class="c1">-- c_mktsegment: CHAR(10) NOT NULL
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="o">|</span><span class="c1">-- c_comment: VARCHAR(117) NOT NULL
</span></span></span></code></pre></div><p>Now that you know which tables are available, you can write your first query.
In this scenario, we keep customer orders in Hive (<code>dev_orders</code>) because of their volume, and reference customer data in Postgres (<code>prod_customer</code>) to be able to easily update it. Let’s write a query that shows customers and their orders by region and order priority for a specific day.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="n">USE</span><span class="w"> </span><span class="k">CATALOG</span><span class="w"> </span><span class="n">postgres</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">r_name</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="n">region</span><span class="o">`</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">o_orderpriority</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="n">priority</span><span class="o">`</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">COUNT</span><span class="p">(</span><span class="k">DISTINCT</span><span class="w"> </span><span class="n">c_custkey</span><span class="p">)</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="n">number_of_customers</span><span class="o">`</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">COUNT</span><span class="p">(</span><span class="n">o_orderkey</span><span class="p">)</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="n">number_of_orders</span><span class="o">`</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">FROM</span><span class="w"> </span><span class="o">`</span><span class="n">hive</span><span class="o">`</span><span class="p">.</span><span class="o">`</span><span class="k">default</span><span class="o">`</span><span class="p">.</span><span class="n">dev_orders</span><span class="w"> </span><span class="c1">-- we need to fully qualify the table in hive because we set the
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="c1">-- current catalog to Postgres
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">JOIN</span><span class="w"> </span><span class="n">prod_customer</span><span class="w"> </span><span class="k">ON</span><span class="w"> </span><span class="n">o_custkey</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">c_custkey</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">JOIN</span><span class="w"> </span><span class="n">prod_nation</span><span class="w"> </span><span class="k">ON</span><span class="w"> </span><span class="n">c_nationkey</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">n_nationkey</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">JOIN</span><span class="w"> </span><span class="n">prod_region</span><span class="w"> </span><span class="k">ON</span><span class="w"> </span><span class="n">n_regionkey</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">r_regionkey</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">WHERE</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">FLOOR</span><span class="p">(</span><span class="n">o_ordertime</span><span class="w"> </span><span class="k">TO</span><span class="w"> </span><span class="k">DAY</span><span class="p">)</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="w"> </span><span class="s1">&#39;2020-04-01 0:00:00.000&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">AND</span><span class="w"> </span><span class="k">NOT</span><span class="w"> </span><span class="n">o_orderpriority</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;4-NOT SPECIFIED&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">GROUP</span><span class="w"> </span><span class="k">BY</span><span class="w"> </span><span class="n">r_name</span><span class="p">,</span><span class="w"> </span><span class="n">o_orderpriority</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">ORDER</span><span class="w"> </span><span class="k">BY</span><span class="w"> </span><span class="n">r_name</span><span class="p">,</span><span class="w"> </span><span class="n">o_orderpriority</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>Flink&rsquo;s catalog support also covers storing Flink-specific objects in external catalogs that might not be fully usable by the corresponding external tools. The most notable use case for this is, for example, storing a table that describes a Kafka topic in a Hive catalog. Take the following DDL statement, that contains a watermark declaration as well as a set of connector properties that are not recognizable by Hive. You won&rsquo;t be able to query the table with Hive, but it will be persisted and can be reused by different Flink jobs.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="n">USE</span><span class="w"> </span><span class="k">CATALOG</span><span class="w"> </span><span class="n">hive</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">prod_lineitem</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_orderkey</span><span class="w"> </span><span class="nb">INTEGER</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_partkey</span><span class="w"> </span><span class="nb">INTEGER</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_suppkey</span><span class="w"> </span><span class="nb">INTEGER</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_linenumber</span><span class="w"> </span><span class="nb">INTEGER</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_quantity</span><span class="w"> </span><span class="n">DOUBLE</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_extendedprice</span><span class="w"> </span><span class="n">DOUBLE</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_discount</span><span class="w"> </span><span class="n">DOUBLE</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_tax</span><span class="w"> </span><span class="n">DOUBLE</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_currency</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_returnflag</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_linestatus</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_ordertime</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="p">(</span><span class="mi">3</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_shipinstruct</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_shipmode</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_comment</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_proctime</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">PROCTIME</span><span class="p">(),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">WATERMARK</span><span class="w"> </span><span class="k">FOR</span><span class="w"> </span><span class="n">l_ordertime</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">l_ordertime</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="nb">INTERVAL</span><span class="w"> </span><span class="s1">&#39;5&#39;</span><span class="w"> </span><span class="n">SECONDS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="o">=</span><span class="s1">&#39;kafka&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;topic&#39;</span><span class="o">=</span><span class="s1">&#39;lineitem&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;scan.startup.mode&#39;</span><span class="o">=</span><span class="s1">&#39;earliest-offset&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;properties.bootstrap.servers&#39;</span><span class="o">=</span><span class="s1">&#39;kafka:9092&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;properties.group.id&#39;</span><span class="o">=</span><span class="s1">&#39;testGroup&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;format&#39;</span><span class="o">=</span><span class="s1">&#39;csv&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;csv.field-delimiter&#39;</span><span class="o">=</span><span class="s1">&#39;|&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>With <code>prod_lineitem</code> stored in Hive, you can now write a query that will enrich the incoming stream with static data kept in Postgres. To illustrate how this works, let&rsquo;s calculate the item prices based on the current currency rates:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="n">USE</span><span class="w"> </span><span class="k">CATALOG</span><span class="w"> </span><span class="n">postgres</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_proctime</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="n">querytime</span><span class="o">`</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_orderkey</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="k">order</span><span class="o">`</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_linenumber</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="n">linenumber</span><span class="o">`</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_currency</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="n">currency</span><span class="o">`</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">rs_rate</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="n">cur_rate</span><span class="o">`</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">(</span><span class="n">l_extendedprice</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="p">(</span><span class="mi">1</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">l_discount</span><span class="p">)</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="p">(</span><span class="mi">1</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">l_tax</span><span class="p">))</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">rs_rate</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="o">`</span><span class="n">open_in_euro</span><span class="o">`</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">FROM</span><span class="w"> </span><span class="n">hive</span><span class="p">.</span><span class="o">`</span><span class="k">default</span><span class="o">`</span><span class="p">.</span><span class="n">prod_lineitem</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">JOIN</span><span class="w"> </span><span class="n">prod_rates</span><span class="w"> </span><span class="k">FOR</span><span class="w"> </span><span class="n">SYSTEM_TIME</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="k">OF</span><span class="w"> </span><span class="n">l_proctime</span><span class="w"> </span><span class="k">ON</span><span class="w"> </span><span class="n">rs_symbol</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">l_currency</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">WHERE</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">l_linestatus</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;O&#39;</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>The query above uses a <code>SYSTEM AS OF</code> <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html#temporal-table">clause</a> for executing a temporal join. If you&rsquo;d like to learn more about the different kind of joins you can do in Flink I highly encourage you to check <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins">this documentation page</a>.</p>
<h2 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h2>
<p>Catalogs can be extremely powerful when building data platforms aimed at reusing the work of different teams in an organization. Centralizing the metadata is a common practice for improving productivity, security, and compliance when working with data.</p>
<p>Flink provides flexible metadata management capabilities, that aim at reducing the cumbersome, repetitive work needed before querying the data such as defining schemas, connection properties etc. As of version 1.11, Flink provides a native, comprehensive integration with Hive Metastore and a read-only version for Postgres catalogs.</p>
<p>You can get started with Flink and catalogs by reading <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/catalogs.html">the docs</a>. If you want to play around with Flink SQL (e.g. try out how catalogs work in Flink yourself), you can check <a href="https://github.com/fhueske/flink-sql-demo">this demo</a> prepared by our colleagues Fabian and Timo — it runs in a dockerized environment, and we used it for the examples in this blog post.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2020-07-23-catalogs.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#why-do-i-need-a-catalog">Why do I need a catalog?</a></li>
<li><a href="#what-is-stored-in-a-catalog">What is stored in a catalog?</a></li>
<li><a href="#catalogs-support-in-flink-sql">Catalogs support in Flink SQL</a></li>
<li><a href="#conclusion">Conclusion</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>