blob: 1434b31028f54cf5f88b1eeea0473ca7f7c15bb6 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Storm SQL example</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 2.2.0</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Storm SQL example</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><p>This page shows how to use Storm SQL by showing the example of processing Apache logs.
This page is written by &quot;how-to&quot; style so you can follow the step and learn how to utilize Storm SQL step by step. </p>
<h2 id="preparation">Preparation</h2>
<p>This page assumes that Apache Zookeeper, Apache Storm and Apache Kafka are installed locally and running properly configured.
For convenience, this page assumes that Apache Kafka 0.10.0 is installed via <code>brew</code>.</p>
<p>We&#39;ll use below tools to prepare the JSON data which will be fed to the input data source.
Since they&#39;re Python projects, this page assumes Python 2.7 with <code>pip</code>, <code>virtualenv</code> is installed locally.
If you&#39;re using Python 3, you may need to convert some places to be compatible with 3 manually while feeding data. </p>
<ul>
<li><a href="https://github.com/kiritbasu/Fake-Apache-Log-Generator">https://github.com/kiritbasu/Fake-Apache-Log-Generator</a></li>
<li><a href="https://github.com/rory/apache-log-parser">https://github.com/rory/apache-log-parser</a></li>
</ul>
<h2 id="creating-topics">Creating Topics</h2>
<p>In this page, we will use four topics, <code>apache-logs</code>, <code>apache-errorlogs</code>, <code>apache-slowlogs</code>.
Please create topics according to your environment. </p>
<p>For Apache Kafka 0.10.0 with brew installed,</p>
<div class="highlight"><pre><code class="language-" data-lang="">kafka-topics --create --topic apache-logs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-errorlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-slowlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
</code></pre></div>
<h2 id="feeding-data">Feeding Data</h2>
<p>Let&#39;s feed the data to input topics. In this page we will generate fake Apache logs, and parse to JSON format, and feed JSON to Kafka topic. </p>
<p>Let&#39;s create your working directory, since we will clone the project and also setup virtualenv.</p>
<p>In your working directory, <code>virtualenv env</code> to setup virtualenv to env directory, and activate.</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ virtualenv env
$ source env/bin/activate
</code></pre></div>
<p>Feel free to <code>deactivate</code> when you&#39;re done with example.</p>
<h3 id="install-and-modify-fake-apache-log-generator">Install and modify Fake-Apache-Log-Generator</h3>
<p><code>Fake-Apache-Log-Generator</code> is not presented to package, and also we need to modify the script.</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git
$ cd Fake-Apache-Log-Generator
</code></pre></div>
<p>Open <code>apache-fake-log-gen.py</code> and replace <code>while (flag):</code> statements to below:</p>
<div class="highlight"><pre><code class="language-" data-lang=""> elapsed_us = random.randint(1 * 1000,1000 * 1000) # 1 ms to 1 sec
seconds=random.randint(30,300)
increment = datetime.timedelta(seconds=seconds)
otime += increment
ip = faker.ipv4()
dt = otime.strftime('%d/%b/%Y:%H:%M:%S')
tz = datetime.datetime.now(pytz.timezone('US/Pacific')).strftime('%z')
vrb = numpy.random.choice(verb,p=[0.6,0.1,0.1,0.2])
uri = random.choice(resources)
if uri.find("apps")&gt;0:
uri += `random.randint(1000,10000)`
resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04])
byt = int(random.gauss(5000,50))
referer = faker.uri()
useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )()
f.write('%s - - [%s %s] %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % (ip,dt,tz,elapsed_us,vrb,uri,resp,byt,referer,useragent))
log_lines = log_lines - 1
flag = False if log_lines == 0 else True
</code></pre></div>
<p>to make sure fake elapsed_us is included to fake log.</p>
<p>For convenience, you can skip cloning project and download modified file from here: <a href="https://gist.github.com/HeartSaVioR/79fd4e461604fabecf535ffece47e6c2">apache-fake-log-gen.py (gist)</a></p>
<h3 id="install-apache-log-parser-and-write-parsing-script">Install apache-log-parser and write parsing script</h3>
<p><code>apache-log-parser</code> can be installed via <code>pip</code>.</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ pip install apache-log-parser
</code></pre></div>
<p>Since apache-log-parser is a library, in order to parse fake log we need to write small python script.
Let&#39;s create file <code>parse-fake-log-gen-to-json-with-incrementing-id.py</code> with below content: </p>
<div class="highlight"><pre><code class="language-" data-lang="">import sys
import apache_log_parser
import json
auto_incr_id = 1
parser_format = '%a - - %t %D "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
line_parser = apache_log_parser.make_parser(parser_format)
while True:
# we'll use pipe
line = sys.stdin.readline()
if not line:
break
parsed_dict = line_parser(line)
parsed_dict['id'] = auto_incr_id
auto_incr_id += 1
# works only python 2, but I don't care cause it's just a test module :)
parsed_dict = {k.upper(): v for k, v in parsed_dict.iteritems() if not k.endswith('datetimeobj')}
print json.dumps(parsed_dict)
</code></pre></div>
<h3 id="feed-parsed-json-apache-log-to-kafka">Feed parsed JSON Apache Log to Kafka</h3>
<p>OK! We&#39;re prepared to feed the data to Kafka topic. Let&#39;s use <code>kafka-console-producer</code> to feed parsed JSON.</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ python apache-fake-log-gen.py -n 0 | python parse-fake-log-gen-to-json-with-incrementing-id.py | kafka-console-producer --broker-list localhost:9092 --topic apache-logs
</code></pre></div>
<p>and execute below to another terminal session to confirm data is being fed.</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-logs
</code></pre></div>
<p>If you can see the json like below, it&#39;s done:</p>
<div class="highlight"><pre><code class="language-" data-lang="">{"TIME_US": "757467", "REQUEST_FIRST_LINE": "GET /wp-content HTTP/1.0", "REQUEST_METHOD": "GET", "RESPONSE_BYTES_CLF": "4988", "TIME_RECEIVED_ISOFORMAT": "2021-06-30T22:02:53", "TIME_RECEIVED_TZ_ISOFORMAT": "2021-06-30T22:02:53-07:00", "REQUEST_HTTP_VER": "1.0", "REQUEST_HEADER_USER_AGENT__BROWSER__FAMILY": "Firefox", "REQUEST_HEADER_USER_AGENT__IS_MOBILE": false, "REQUEST_HEADER_USER_AGENT__BROWSER__VERSION_STRING": "3.6.13", "REQUEST_URL_FRAGMENT": "", "REQUEST_HEADER_USER_AGENT": "Mozilla/5.0 (X11; Linux x86_64; rv:1.9.7.20) Gecko/2010-10-13 13:52:34 Firefox/3.6.13", "REQUEST_URL_SCHEME": "", "REQUEST_URL_PATH": "/wp-content", "REQUEST_URL_QUERY_SIMPLE_DICT": {}, "TIME_RECEIVED_UTC_ISOFORMAT": "2021-07-01T05:02:53+00:00", "REQUEST_URL_QUERY_DICT": {}, "STATUS": "200", "REQUEST_URL_NETLOC": "", "REQUEST_URL_QUERY_LIST": [], "REQUEST_URL_QUERY": "", "REQUEST_URL_USERNAME": null, "REQUEST_HEADER_USER_AGENT__OS__VERSION_STRING": "", "REQUEST_URL_HOSTNAME": null, "REQUEST_HEADER_USER_AGENT__OS__FAMILY": "Linux", "REQUEST_URL": "/wp-content", "ID": 904128, "REQUEST_HEADER_REFERER": "http://white.com/terms/", "REQUEST_URL_PORT": null, "REQUEST_URL_PASSWORD": null, "TIME_RECEIVED": "[30/Jun/2021:22:02:53 -0700]", "REMOTE_IP": "88.203.90.62"}
</code></pre></div>
<h2 id="example-filtering-error-logs">Example: filtering error logs</h2>
<p>In this example we&#39;ll filter error logs from entire logs and store them to another topics. <code>project</code> and <code>filter</code> features will be used.</p>
<p>The content of script file is here:</p>
<div class="highlight"><pre><code class="language-" data-lang="">CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://apache-logs?bootstrap-servers=localhost:9092'
CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://apache-error-logs?bootstrap-servers=localhost:9092' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}'
INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) &gt;= 4
</code></pre></div>
<p>Save this file to <code>apache_log_error_filtering.sql</code>.</p>
<p>Let&#39;s take a look at the script.</p>
<p>The first statement defines the table <code>APACHE_LOGS</code> which represents the input stream. The <code>LOCATION</code> clause specifies the Kafka host (<code>localhost:9092</code>) and the topic (<code>apache-logs</code>).
Note that Kafka data source requires primary key to be defined. That&#39;s why we put integer id for parsed JSON data.</p>
<p>Similarly, the second statement specifies the table <code>APACHE_ERROR_LOGS</code> which represents the output stream. The <code>TBLPROPERTIES</code> clause specifies the configuration of <a href="http://kafka.apache.org/documentation.html#producerconfigs">KafkaProducer</a> and is required for a Kafka sink table.</p>
<p>The last statement defines the topology. Storm SQL only define the topology and run topology on DML statement.
DDL statements define input data source, output data source, and user defined function which will be referred by DML statement.</p>
<p>Let&#39;s look at the <code>where</code> statement first. Since we want to filter error logs, we divide status by 100 and compare quotient is equal or greater than 4. (easier representation is <code>&gt;= 400</code>)
Since status in JSON is string format (hence represented as VARCHAR for APACHE_LOGS table), we apply CAST(STATUS AS INT) to convert to integer type before applying division.
Now we have filtered only error logs. </p>
<p>Let&#39;s transform some columns to match the output stream. In this statement we apply CAST(STATUS AS INT) to convert to integer type, and divide TIME_US by 1000 to convert microsecond to millisecond.</p>
<p>Last, insert statement stores filtered and transformed rows (tuples) to the output stream. </p>
<p>To run this example, users need to include the data sources (<code>storm-sql-kafka</code> in this case) and its dependency in the
class path. The Storm SQL core dependencies are automatically handled when users run <code>storm sql</code>.
Users can include data sources at the submission step like below:</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql apache_log_error_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka-client:2.0.0-SNAPSHOT,org.apache.kafka:kafka-clients:1.1.0^org.slf4j:slf4j-log4j12"
</code></pre></div>
<p>Above command submits the SQL statements to StormSQL. The command line syntax of Storm SQL is <code>storm sql [script file] [topology name]</code>.
Users need to modify each artifacts&#39; version if users are using different version of Storm or Kafka.</p>
<p>If your statements pass the validation phase, topology will be shown to Storm UI page.</p>
<p>You can see the output via console:</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-error-logs
</code></pre></div>
<p>and the output will be similar to:</p>
<div class="highlight"><pre><code class="language-" data-lang="">{"ID":854643,"REMOTE_IP":"4.227.214.159","REQUEST_URL":"/wp-content","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows 98; Win 9x 4.90; it-IT; rv:1.9.2.20) Gecko/2015-06-03 11:20:16 Firefox/3.6.17","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T19:14:44+00:00","TIME_RECEIVED_TIMESTAMP":1616958884000,"TIME_ELAPSED_MS":274.222}
{"ID":854693,"REMOTE_IP":"223.50.249.7","REQUEST_URL":"/apps/cart.jsp?appID=5578","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_6; rv:1.9.2.20) Gecko/2015-11-06 00:20:43 Firefox/3.8","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T21:41:02+00:00","TIME_RECEIVED_TIMESTAMP":1616967662000,"TIME_ELAPSED_MS":716.851}
...
</code></pre></div>
<p>You can also run Storm SQL runner to see the logical plan via placing <code>--explain</code> to topology name:</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka-client:2.0.0-SNAPSHOT,org.apache.kafka:kafka-clients:1.1.0^org.slf4j:slf4j-log4j12"
</code></pre></div>
<p>and the output will be similar to:</p>
<div class="highlight"><pre><code class="language-" data-lang="">LogicalTableModify(table=[[APACHE_ERROR_LOGS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8
LogicalProject(ID=[$0], REMOTE_IP=[$1], REQUEST_URL=[$2], REQUEST_METHOD=[$3], STATUS=[CAST($4):INTEGER NOT NULL], REQUEST_HEADER_USER_AGENT=[$5], TIME_RECEIVED_UTC_ISOFORMAT=[$6], TIME_ELAPSED_MS=[/($7, 1000)]), id = 7
LogicalFilter(condition=[&gt;=(/(CAST($4):INTEGER NOT NULL, 100), 4)]), id = 6
EnumerableTableScan(table=[[APACHE_LOGS]]), id = 5
</code></pre></div>
<p>It might be not same as you are seeing if Storm SQL applies query optimizations.</p>
<p>We&#39;re executing the first Storm SQL topology! Please kill the topology when you see enough output and the logs.</p>
<p>To be concise, we&#39;ll skip explaining the things we&#39;ve already seen.</p>
<h2 id="example-filtering-slow-logs">Example: filtering slow logs</h2>
<p>In this example we&#39;ll filter slow logs from entire logs and store them to another topics. <code>project</code> and <code>filter</code>, and <code>User Defined Function (UDF)</code> features will be used.
This is very similar to <code>filtering error logs</code> but we&#39;ll see how to define <code>User Defined Function (UDF)</code>.</p>
<p>The content of script file is here:</p>
<div class="highlight"><pre><code class="language-" data-lang="">CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://apache-logs?bootstrap-servers=localhost:9092' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}'
CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_RECEIVED_TIMESTAMP BIGINT, TIME_ELAPSED_MS INT) LOCATION 'kafka://apache-slow-logs?bootstrap-servers=localhost:9092' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}'
CREATE FUNCTION GET_TIME AS 'org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2'
INSERT INTO APACHE_SLOW_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ') AS TIME_RECEIVED_TIMESTAMP, TIME_US / 1000 AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (TIME_US / 1000) &gt;= 100
</code></pre></div>
<p>Save this file to <code>apache_log_slow_filtering.sql</code>.</p>
<p>We can skip the first 2 statements since it&#39;s almost same to the last example.</p>
<p>The third statement defines the <code>User defined function</code>. We&#39;re defining <code>GET_TIME</code> which uses <code>org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2</code> class.</p>
<p>The implementation of GetTime2 is here:</p>
<div class="highlight"><pre><code class="language-" data-lang=""><span class="k">package</span> <span class="n">org</span><span class="p">.</span><span class="n">apache</span><span class="p">.</span><span class="n">storm</span><span class="p">.</span><span class="n">sql</span><span class="p">.</span><span class="n">runtime</span><span class="p">.</span><span class="n">functions</span><span class="p">.</span><span class="n">scalar</span><span class="p">.</span><span class="n">datetime</span><span class="p">;</span>
<span class="n">import</span> <span class="n">org</span><span class="p">.</span><span class="n">joda</span><span class="p">.</span><span class="n">time</span><span class="p">.</span><span class="n">format</span><span class="p">.</span><span class="n">DateTimeFormat</span><span class="p">;</span>
<span class="n">import</span> <span class="n">org</span><span class="p">.</span><span class="n">joda</span><span class="p">.</span><span class="n">time</span><span class="p">.</span><span class="n">format</span><span class="p">.</span><span class="n">DateTimeFormatter</span><span class="p">;</span>
<span class="k">public</span> <span class="n">class</span> <span class="n">GetTime2</span> <span class="p">{</span>
<span class="k">public</span> <span class="n">static</span> <span class="n">Long</span> <span class="n">evaluate</span><span class="p">(</span><span class="k">String</span> <span class="n">dateString</span><span class="p">,</span> <span class="k">String</span> <span class="n">dateFormat</span><span class="p">)</span> <span class="p">{</span>
<span class="n">try</span> <span class="p">{</span>
<span class="n">DateTimeFormatter</span> <span class="n">df</span> <span class="p">=</span> <span class="n">DateTimeFormat</span><span class="p">.</span><span class="n">forPattern</span><span class="p">(</span><span class="n">dateFormat</span><span class="p">).</span><span class="n">withZoneUTC</span><span class="p">();</span>
<span class="n">return</span> <span class="n">df</span><span class="p">.</span><span class="n">parseDateTime</span><span class="p">(</span><span class="n">dateString</span><span class="p">).</span><span class="n">getMillis</span><span class="p">();</span>
<span class="p">}</span> <span class="n">catch</span> <span class="p">(</span><span class="n">Exception</span> <span class="n">ex</span><span class="p">)</span> <span class="p">{</span>
<span class="n">throw</span> <span class="n">new</span> <span class="n">RuntimeException</span><span class="p">(</span><span class="n">ex</span><span class="p">);</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="p">}</span>
</code></pre></div>
<p>This class can be used for UDF since it defines static <code>evaluate</code> method. The SQL type of parameters and return are determined by Calcite which Storm SQL depends on. </p>
<p>Note that this class should be in classpath, so in order to define UDF, you need to create jar file which contains UDF classes and run <code>storm sql</code> with <code>--jar</code> option.
This page assumes that GetTime2 is in classpath, for simplicity.</p>
<p>The last statement is very similar to filtering error logs. The only new thing is that we call <code>GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, &#39;yyyy-MM-dd&#39;&#39;T&#39;&#39;HH:mm:ssZZ&#39;)</code> to convert string time to unix timestamp (BIGINT).</p>
<p>Let&#39;s execute it.</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql apache_log_slow_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka-client:2.0.0-SNAPSHOT,org.apache.kafka:kafka-clients:1.1.0^org.slf4j:slf4j-log4j12"
</code></pre></div>
<p>You can see the output via console:</p>
<div class="highlight"><pre><code class="language-" data-lang="">$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-slow-logs
</code></pre></div>
<p>and the output will be similar to:</p>
<div class="highlight"><pre><code class="language-" data-lang="">{"ID":890502,"REMOTE_IP":"136.156.159.160","REQUEST_URL":"/list","REQUEST_METHOD":"GET","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows NT 5.01) AppleWebKit/5311 (KHTML, like Gecko) Chrome/13.0.860.0 Safari/5311","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T03:44:59+00:00","TIME_RECEIVED_TIMESTAMP":1622864699000,"TIME_ELAPSED_MS":638.579}
{"ID":890542,"REMOTE_IP":"105.146.3.190","REQUEST_URL":"/search/tag/list","REQUEST_METHOD":"DELETE","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (X11; Linux i686) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.891.0 Safari/5332","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T05:54:27+00:00","TIME_RECEIVED_TIMESTAMP":1622872467000,"TIME_ELAPSED_MS":403.957}
...
</code></pre></div>
<p>That&#39;s it! Supposing we have UDF which queries geo location via remote ip, we can filter via geo location, or enrich geo location to transformed result.</p>
<h2 id="summary">Summary</h2>
<p>We looked through several simple use cases for Storm SQL to learn Storm SQL features. If you haven&#39;t looked at <a href="storm-sql.html">Storm SQL integration</a> and <a href="storm-sql-reference.html">Storm SQL language</a>, you need to read it to see full supported features. </p>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>