blob: 53325f8bef8c1a38df019f21bf1e557eb18b2394 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Storm SQL integration</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 2.0.0</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Storm SQL integration</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><p>The Storm SQL integration allows users to run SQL queries over streaming data in Storm. Not only the SQL interface allows faster development cycles on streaming analytics, but also opens up the opportunities to unify batch data processing like <a href="///hive.apache.org">Apache Hive</a> and real-time streaming data analytics.</p>
<p>At a very high level StormSQL compiles the SQL queries to Storm topologies leveraging Streams API and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to <a href="storm-sql-internal.html">this</a> page.</p>
<p>Storm SQL integration is an <code>experimental</code> feature, so the internal of Storm SQL and supported features are subject to change.
But small change will not affect the user experience. We will notice/announce the user when breaking UX change is introduced.</p>
<h2 id="usage">Usage</h2>
<p>Run the <code>storm sql</code> command to compile SQL statements into Storm topology, and submit it to the Storm cluster</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ bin/storm sql &lt;sql-file&gt; &lt;topo-name&gt;
</code></pre></div>
<p>In which <code>sql-file</code> contains a list of SQL statements to be executed, and <code>topo-name</code> is the name of the topology.</p>
<p>StormSQL activates <code>explain mode</code> and shows query plan instead of submitting topology when user specifies <code>topo-name</code> as <code>--explain</code>.
Detailed explanation is available from <code>Showing Query Plan (explain mode)</code> section.</p>
<h2 id="supported-features">Supported Features</h2>
<p>The following features are supported in the current repository:</p>
<ul>
<li>Streaming from and to external data sources</li>
<li>Filtering tuples</li>
<li>Projections</li>
<li>User defined function (scalar)</li>
</ul>
<p>Aggregations and Join are not supported for now. When Storm SQL will support native <code>Streaming SQL</code>, these features will be introduced.</p>
<p>Please be aware that as Storm uses <a href="///calcite.apache.org">Apache Calcite</a> to parse the supplied SQL, you will likely benefit from skimming the Calcite documentation, in particular the section on <a href="https://calcite.apache.org/docs/reference.html#identifiers">identifiers</a>. </p>
<h2 id="specifying-external-data-sources">Specifying External Data Sources</h2>
<p>In StormSQL data is represented by external tables. Users can specify data sources using the <code>CREATE EXTERNAL TABLE</code> statement. The syntax of <code>CREATE EXTERNAL TABLE</code> closely follows the one defined in <a href="https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL">Hive Data Definition Language</a>:</p>
<div class="highlight"><pre><code class="language-" data-lang="">CREATE EXTERNAL TABLE table_name field_list
[ STORED AS
INPUTFORMAT input_format_classname
OUTPUTFORMAT output_format_classname
]
LOCATION location
[ PARALLELISM parallelism ]
[ TBLPROPERTIES tbl_properties ]
[ AS select_stmt ]
</code></pre></div>
<p>You can find detailed explanations of the properties in <a href="https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL">Hive Data Definition Language</a>. </p>
<p><code>PARALLELISM</code> is StormSQL&#39;s own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Spout.
Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition).</p>
<p>Default value is 1, and this option is no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.)</p>
<p>For example, the following statement specifies a Kafka spout and sink:</p>
<div class="highlight"><pre><code class="language-" data-lang="">CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://test?bootstrap-servers=localhost:9092' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}'
</code></pre></div>
<h2 id="plugging-in-external-data-sources">Plugging in External Data Sources</h2>
<p>Users plug in external data sources through implementing the <code>ISqlStreamsDataSource</code> interface and registers them using the mechanisms of Java&#39;s service loader. The external data source will be chosen based on the scheme of the URI of the tables. Please refer to the implementation of <code>storm-sql-kafka</code> for more details.</p>
<h2 id="specifying-user-defined-function-udf">Specifying User Defined Function (UDF)</h2>
<p>Users can define user defined function (scalar) using <code>CREATE FUNCTION</code> statement.
For example, the following statement defines <code>MYPLUS</code> function which uses <code>org.apache.storm.sql.TestUtils$MyPlus</code> class.</p>
<div class="highlight"><pre><code class="language-" data-lang="">CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'
</code></pre></div>
<p>Storm SQL determines whether the function as scalar or aggregate by checking which methods are defined.
If the class defines <code>evaluate</code> method, Storm SQL treats the function as <code>scalar</code>.</p>
<p>Example of class for scalar function is here:</p>
<div class="highlight"><pre><code class="language-" data-lang=""> public class MyPlus {
public static Integer evaluate(Integer x, Integer y) {
return x + y;
}
}
</code></pre></div>
<h2 id="example-filtering-kafka-stream">Example: Filtering Kafka Stream</h2>
<p>Let&#39;s say there is a Kafka stream that represents the transactions of orders. Each message in the stream contains the id of the order, the unit price of the product and the quantity of the orders. The goal is to filter orders where the transactions are significant and to insert these orders into another Kafka stream for further analysis.</p>
<p>The user can specify the following SQL statements in the SQL file:</p>
<div class="highlight"><pre><code class="language-" data-lang="">CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://orders?bootstrap-servers=localhost:9092,localhost:9093'
CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://large_orders?bootstrap-servers=localhost:9092,localhost:9093' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}'
INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY &gt; 50
</code></pre></div>
<p>The first statement defines the table <code>ORDER</code> which represents the input stream. The <code>LOCATION</code> clause specifies the Kafka bootstrap servers (<code>localhost:9092,localhost:9093</code>) and the topic (<code>orders</code>). </p>
<p>Similarly, the second statement specifies the table <code>LARGE_ORDERS</code> which represents the output stream. The <code>TBLPROPERTIES</code> clause specifies the configuration of <a href="http://kafka.apache.org/documentation.html#producerconfigs">KafkaProducer</a> and is required for a Kafka sink table. </p>
<p>The third statement is a <code>SELECT</code> statement which defines the topology: it instructs StormSQL to filter all orders in the external table <code>ORDERS</code>, calculates the total price and inserts matching records into the Kafka stream specified by <code>LARGE_ORDER</code>.</p>
<p>To run this example, users need to include the data sources (<code>storm-sql-kafka</code> in this case) and its dependency in the
class path. The Storm SQL core dependencies are automatically handled when users run <code>storm sql</code>. Users can include data sources at the submission step like below:</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ bin/storm sql order_filtering.sql order_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka-client:2.0.0-SNAPSHOT,org.apache.kafka:kafka-clients:1.1.0^org.slf4j:slf4j-log4j12"
</code></pre></div>
<p>Above command submits the SQL statements to StormSQL. Users need to modify each artifacts&#39; version if users are using different version of Storm or Kafka. </p>
<p>Having run the above command, you should now be able to see the <code>order_filtering</code> topology in Storm UI.</p>
<h2 id="showing-query-plan-explain-mode">Showing Query Plan (explain mode)</h2>
<p>Like <code>explain</code> on SQL statement, StormSQL provides <code>explain mode</code> when running Storm SQL Runner. In explain mode, StormSQL analyzes each query statement (only DML) and show plan instead of submitting topology.</p>
<p>In order to run <code>explain mode</code>, you need to provide topology name as <code>--explain</code> and run <code>storm sql</code> as same as submitting.</p>
<p>For example, when you run the example seen above with explain mode:</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ bin/storm sql order_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka-client:2.0.0-SNAPSHOT,org.apache.kafka:kafka-clients:1.1.0^org.slf4j:slf4j-log4j12"
</code></pre></div>
<p>StormSQL prints out like below:</p>
<div class="highlight"><pre><code class="language-" data-lang="">
===========================================================
query&gt;
CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://orders?bootstrap-servers=localhost:9092,localhost:9093'
-----------------------------------------------------------
17:03:06.040 [main] INFO o.a.s.s.r.DataSourcesRegistry - Registering scheme socket with org.apache.storm.sql.runtime.datasource.socket.SocketDataSourcesProvider@d62fe5b
17:03:06.090 [main] INFO o.a.s.s.r.DataSourcesRegistry - Registering scheme kafka with org.apache.storm.sql.kafka.KafkaDataSourcesProvider@34158c08
17:03:06.290 [main] INFO o.a.s.k.s.KafkaSpoutConfig - Setting Kafka consumer property 'auto.offset.reset' to 'earliest' to ensure at-least-once processing
17:03:06.290 [main] INFO o.a.s.k.s.KafkaSpoutConfig - Setting Kafka consumer property 'enable.auto.commit' to 'false', because the spout does not support auto-commit
No plan presented on DDL
===========================================================
===========================================================
query&gt;
CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://large_orders?bootstrap-servers=localhost:9092,localhost:9093' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}'
-----------------------------------------------------------
17:03:06.504 [main] INFO o.a.s.k.s.KafkaSpoutConfig - Setting Kafka consumer property 'auto.offset.reset' to 'earliest' to ensure at-least-once processing
17:03:06.504 [main] INFO o.a.s.k.s.KafkaSpoutConfig - Setting Kafka consumer property 'enable.auto.commit' to 'false', because the spout does not support auto-commit
No plan presented on DDL
===========================================================
===========================================================
query&gt;
INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY &gt; 50
-----------------------------------------------------------
plan&gt;
LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], flattened=[true])
LogicalProject(ID=[$0], TOTAL=[*($1, $2)])
LogicalFilter(condition=[&gt;(*($1, $2), 50)])
EnumerableTableScan(table=[[ORDERS]])
===========================================================
</code></pre></div>
<h2 id="debugging-expressions">Debugging expressions</h2>
<p>When a Storm SQL topology is submitted, the code generated based on the SQL expressions will be logged. For example, the WHERE clause in the order filtering example will produce the following log during submit:</p>
<div class="highlight"><pre><code class="language-" data-lang="">17:37:55.344 [main] INFO o.a.s.s.r.s.f.EvaluationCalc - Expression code for filter:
public class Generated_STREAMSCALCREL_33_0 implements org.apache.storm.sql.runtime.calcite.ExecutableExpression {
public void execute(org.apache.calcite.interpreter.Context context, Object[] outputValues) {
final Object[] current = context.values;
outputValues[0] = org.apache.calcite.runtime.SqlFunctions.toInt(current[1]) * org.apache.calcite.runtime.SqlFunctions.toInt(current[2]) &gt; 50;
}
public Object execute(org.apache.calcite.interpreter.Context context) {
final Object[] values = new Object[1];
this.execute(context, values);
return values[0];
}
}
</code></pre></div>
<p>This can be helpful in case you need to debug the topology.</p>
<h2 id="current-limitations">Current Limitations</h2>
<ul>
<li>Windowing is yet to be implemented.</li>
<li>Aggregation and join will be introduced after supporting windowing.</li>
</ul>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>