blob: 1de1a17156de28b1e0f309dd2fb6ed0f21ad413c [file] [log] [blame]
<!DOCTYPE HTML>
<html lang="en-US">
<head>
<meta charset="UTF-8">
<title>Kafka adapter</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<meta name="generator" content="Jekyll v3.7.3">
<link rel="stylesheet" href="//fonts.googleapis.com/css?family=Lato:300,300italic,400,400italic,700,700italic,900">
<link rel="stylesheet" href="/css/screen.css">
<link rel="icon" type="image/x-icon" href="/favicon.ico">
<!--[if lt IE 9]>
<script src="/js/html5shiv.min.js"></script>
<script src="/js/respond.min.js"></script>
<![endif]-->
</head>
<body class="wrap">
<header role="banner">
<div class="grid">
<div class="unit center-on-mobiles">
<h1>
<a href="/">
<span class="sr-only">Apache Calcite</span>
<img src="/img/logo.svg" alt="Calcite Logo">
</a>
</h1>
</div>
<nav class="main-nav">
<ul>
<li class="">
<a href="/">Home</a>
</li>
<li class="">
<a href="/downloads/">Download</a>
</li>
<li class="">
<a href="/community/">Community</a>
</li>
<li class="">
<a href="/develop/">Develop</a>
</li>
<li class="">
<a href="/news/">News</a>
</li>
<li class="current">
<a href="/docs/">Docs</a>
</li>
</ul>
</nav>
</div>
</header>
<section class="docs">
<div class="grid">
<div class="docs-nav-mobile unit whole show-on-mobiles">
<select onchange="if (this.value) window.location.href=this.value">
<option value="">Navigate the docs…</option>
<optgroup label="Overview">
</optgroup>
<optgroup label="Advanced">
</optgroup>
<optgroup label="Avatica">
</optgroup>
<optgroup label="Reference">
</optgroup>
<optgroup label="Meta">
</optgroup>
</select>
</div>
<div class="unit four-fifths">
<article>
<h1>Kafka adapter</h1>
<!--
-->
<p><strong>Note</strong>:</p>
<p>KafkaAdapter is an experimental feature, changes in public API and usage are expected.</p>
<p>For instructions on downloading and building Calcite, start with the<a href="/docs/tutorial.html">tutorial</a>.</p>
<p>The Kafka adapter exposes an Apache Kafka topic as a STREAM table, so it can be queried using
<a href="/docs/stream.html">Calcite Stream SQL</a>. Note that the adapter will not attempt to scan all topics,
instead users need to configure tables manually, one Kafka stream table is mapping to one Kafka topic.</p>
<p>A basic example of a model file is given below:</p>
<figure class="highlight"><pre><code class="language-json" data-lang="json"><span class="p">{</span><span class="w">
</span><span class="s2">"version"</span><span class="p">:</span><span class="w"> </span><span class="s2">"1.0"</span><span class="p">,</span><span class="w">
</span><span class="s2">"defaultSchema"</span><span class="p">:</span><span class="w"> </span><span class="s2">"KAFKA"</span><span class="p">,</span><span class="w">
</span><span class="s2">"schemas"</span><span class="p">:</span><span class="w"> </span><span class="p">[</span><span class="w">
</span><span class="p">{</span><span class="w">
</span><span class="s2">"name"</span><span class="p">:</span><span class="w"> </span><span class="s2">"KAFKA"</span><span class="p">,</span><span class="w">
</span><span class="s2">"tables"</span><span class="p">:</span><span class="w"> </span><span class="p">[</span><span class="w">
</span><span class="p">{</span><span class="w">
</span><span class="s2">"name"</span><span class="p">:</span><span class="w"> </span><span class="s2">"TABLE_NAME"</span><span class="p">,</span><span class="w">
</span><span class="s2">"type"</span><span class="p">:</span><span class="w"> </span><span class="s2">"custom"</span><span class="p">,</span><span class="w">
</span><span class="s2">"factory"</span><span class="p">:</span><span class="w"> </span><span class="s2">"org.apache.calcite.adapter.kafka.KafkaTableFactory"</span><span class="p">,</span><span class="w">
</span><span class="s2">"row.converter"</span><span class="p">:</span><span class="w"> </span><span class="s2">"com.example.CustKafkaRowConverter"</span><span class="p">,</span><span class="w">
</span><span class="s2">"operand"</span><span class="p">:</span><span class="w"> </span><span class="p">{</span><span class="w">
</span><span class="s2">"bootstrap.servers"</span><span class="p">:</span><span class="w"> </span><span class="s2">"host1:port,host2:port"</span><span class="p">,</span><span class="w">
</span><span class="s2">"topic.name"</span><span class="p">:</span><span class="w"> </span><span class="s2">"kafka.topic.name"</span><span class="p">,</span><span class="w">
</span><span class="s2">"consumer.params"</span><span class="p">:</span><span class="w"> </span><span class="p">{</span><span class="w">
</span><span class="s2">"key.deserializer"</span><span class="p">:</span><span class="w"> </span><span class="s2">"org.apache.kafka.common.serialization.ByteArrayDeserializer"</span><span class="p">,</span><span class="w">
</span><span class="s2">"value.deserializer"</span><span class="p">:</span><span class="w"> </span><span class="s2">"org.apache.kafka.common.serialization.ByteArrayDeserializer"</span><span class="w">
</span><span class="p">}</span><span class="w">
</span><span class="p">}</span><span class="w">
</span><span class="p">}</span><span class="w">
</span><span class="p">]</span><span class="w">
</span><span class="p">}</span><span class="w">
</span><span class="p">]</span><span class="w">
</span><span class="p">}</span></code></pre></figure>
<p>Note that:</p>
<ol>
<li>
<p>As Kafka message is schemaless, a <a href="/apidocs/org/apache/calcite/adapter/kafka/KafkaRowConverter.html">KafkaRowConverter</a>
is required to specify row schema explicitly(with parameter <code class="highlighter-rouge">row.converter</code>), and
how to decode Kafka message to Calcite row. <a href="/apidocs/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.html">KafkaRowConverterImpl</a>
is used if not provided;</p>
</li>
<li>
<p>More consumer settings can be added in parameter <code class="highlighter-rouge">consumer.params</code>;</p>
</li>
</ol>
<p>Assuming this file is stored as <code class="highlighter-rouge">kafka.model.json</code>, you can connect to Kafka via
<a href="https://github.com/julianhyde/sqlline"><code class="highlighter-rouge">sqlline</code></a> as follows:</p>
<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./sqlline
sqlline&gt; <span class="o">!</span>connect jdbc:calcite:model<span class="o">=</span>kafka.model.json admin admin</code></pre></figure>
<p><code class="highlighter-rouge">sqlline</code> will now accept SQL queries which access your Kafka topics.</p>
<p>With the Kafka table configured in above model. We can run a simple query to fetch messages:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="n">sqlline</span><span class="o">&gt;</span> <span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">KAFKA</span><span class="p">.</span><span class="k">TABLE_NAME</span><span class="p">;</span>
<span class="o">+</span><span class="c1">---------------+---------------------+---------------------+---------------+-----------------+</span>
<span class="o">|</span> <span class="n">MSG_PARTITION</span> <span class="o">|</span> <span class="n">MSG_TIMESTAMP</span> <span class="o">|</span> <span class="n">MSG_OFFSET</span> <span class="o">|</span> <span class="n">MSG_KEY_BYTES</span> <span class="o">|</span> <span class="n">MSG_VALUE_BYTES</span> <span class="o">|</span>
<span class="o">+</span><span class="c1">---------------+---------------------+---------------------+---------------+-----------------+</span>
<span class="o">|</span> <span class="mi">0</span> <span class="o">|</span> <span class="o">-</span><span class="mi">1</span> <span class="o">|</span> <span class="mi">0</span> <span class="o">|</span> <span class="n">mykey0</span> <span class="o">|</span> <span class="n">myvalue0</span> <span class="o">|</span>
<span class="o">|</span> <span class="mi">0</span> <span class="o">|</span> <span class="o">-</span><span class="mi">1</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="n">mykey1</span> <span class="o">|</span> <span class="n">myvalue1</span> <span class="o">|</span>
<span class="o">+</span><span class="c1">---------------+---------------------+---------------------+---------------+-----------------+</span></code></pre></figure>
<p>Kafka table is a streaming table, which runs continuously.</p>
<p>If you want the query to end quickly, add <code class="highlighter-rouge">LIMIT</code> as follows:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="n">sqlline</span><span class="o">&gt;</span> <span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">KAFKA</span><span class="p">.</span><span class="k">TABLE_NAME</span>
<span class="k">LIMIT</span> <span class="mi">5</span><span class="p">;</span></code></pre></figure>
</article>
</div>
<div class="unit one-fifth hide-on-mobiles">
<aside>
<h4>Overview</h4>
<ul>
<li class=""><a href="/docs/index.html">Background</a></li>
<li class=""><a href="/docs/tutorial.html">Tutorial</a></li>
<li class=""><a href="/docs/algebra.html">Algebra</a></li>
</ul>
<h4>Advanced</h4>
<ul>
<li class=""><a href="/docs/adapter.html">Adapters</a></li>
<li class=""><a href="/docs/spatial.html">Spatial</a></li>
<li class=""><a href="/docs/stream.html">Streaming</a></li>
<li class=""><a href="/docs/materialized_views.html">Materialized Views</a></li>
<li class=""><a href="/docs/lattice.html">Lattices</a></li>
</ul>
<h4>Avatica</h4>
<ul>
<li class=""><a href="/docs/avatica_overview.html">Overview</a></li>
<li class=""><a href="/docs/avatica_roadmap.html">Roadmap</a></li>
<li class=""><a href="/docs/avatica_json_reference.html">JSON Reference</a></li>
<li class=""><a href="/docs/avatica_protobuf_reference.html">Protobuf Reference</a></li>
</ul>
<h4>Reference</h4>
<ul>
<li class=""><a href="/docs/reference.html">SQL language</a></li>
<li class=""><a href="/docs/model.html">JSON/YAML models</a></li>
<li class=""><a href="/docs/howto.html">HOWTO</a></li>
</ul>
<h4>Meta</h4>
<ul>
<li class=""><a href="/docs/history.html">History</a></li>
<li class=""><a href="/docs/powered_by.html">Powered by Calcite</a></li>
<li class=""><a href="/apidocs">API</a></li>
<li class=""><a href="/testapidocs">Test API</a></li>
</ul>
</aside>
</div>
<div class="clear"></div>
</div>
</section>
<footer role="contentinfo">
<div id="poweredby">
<a href="http://www.apache.org/">
<span class="sr-only">Apache</span>
<img src="/img/feather.png" width="190" height="77" alt="Apache Logo"></a>
</div>
<div id="copyright">
<p>The contents of this website are Copyright &copy;&nbsp;2019
<a href="https://www.apache.org/">Apache Software Foundation</a>
under the terms of
the <a href="https://www.apache.org/licenses/">
Apache&nbsp;License&nbsp;v2</a>. Apache Calcite and its logo are
trademarks of the Apache Software Foundation.</p>
</div>
</footer>
<script>
var anchorForId = function (id) {
var anchor = document.createElement("a");
anchor.className = "header-link";
anchor.href = "#" + id;
anchor.innerHTML = "<span class=\"sr-only\">Permalink</span><i class=\"fa fa-link\"></i>";
anchor.title = "Permalink";
return anchor;
};
var linkifyAnchors = function (level, containingElement) {
var headers = containingElement.getElementsByTagName("h" + level);
for (var h = 0; h < headers.length; h++) {
var header = headers[h];
if (typeof header.id !== "undefined" && header.id !== "") {
header.appendChild(anchorForId(header.id));
}
}
};
document.onreadystatechange = function () {
if (this.readyState === "complete") {
var contentBlock = document.getElementsByClassName("docs")[0] || document.getElementsByClassName("news")[0];
if (!contentBlock) {
return;
}
for (var level = 1; level <= 6; level++) {
linkifyAnchors(level, contentBlock);
}
}
};
</script>
</body>
</html>