| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Storm SQL example</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-5"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-5"> |
| |
| <h1>Version: 1.2.3</h1> |
| |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li><a href="/releases/2.4.0/index.html">2.4.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.3.0/index.html">2.3.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.1/index.html">2.2.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.0/index.html">2.2.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.1/index.html">2.1.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.0/index.html">2.1.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.0.0/index.html">2.0.0</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.4/index.html">1.2.4</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.3/index.html">1.2.3</a></li> |
| |
| |
| </ul> |
| </li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| <li><a href="/Powered-By.html">PoweredBy</a></li> |
| </ul> |
| </li> |
| <li><a href="/2022/03/25/storm240-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Storm SQL example</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <div class="documentation-content"><p>This page shows how to use Storm SQL by showing the example of processing Apache logs. |
| This page is written by "how-to" style so you can follow the step and learn how to utilize Storm SQL step by step. </p> |
| |
| <h2 id="preparation">Preparation</h2> |
| |
| <p>This page assumes that Apache Zookeeper, Apache Storm and Apache Kafka is installed locally and running with properly configured. |
| For convenience, this page assumes that Apache Kafka 0.10.0 is installed via <code>brew</code>.</p> |
| |
| <p>We'll use below tools to prepare the JSON data which will be fed to the input data source. |
| Since they're Python projects, this page assumes Python 2.7 with <code>pip</code>, <code>virtualenv</code> is installed locally. |
| If you're using Python 3, you may need to convert some places to be compatible with 3 manually while feeding data. </p> |
| |
| <ul> |
| <li><a href="https://github.com/kiritbasu/Fake-Apache-Log-Generator">https://github.com/kiritbasu/Fake-Apache-Log-Generator</a></li> |
| <li><a href="https://github.com/rory/apache-log-parser">https://github.com/rory/apache-log-parser</a></li> |
| </ul> |
| |
| <h2 id="creating-topics">Creating Topics</h2> |
| |
| <p>In this page, we will use four topics, <code>apache-logs</code>, <code>apache-errorlogs</code>, <code>apache-slowlogs</code>. |
| Please create topics according to your environment. </p> |
| |
| <p>For Apache Kafka 0.10.0 with brew installed,</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">kafka-topics --create --topic apache-logs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 |
| kafka-topics --create --topic apache-errorlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 |
| kafka-topics --create --topic apache-slowlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 |
| </code></pre></div> |
| <h2 id="feeding-data">Feeding Data</h2> |
| |
| <p>Let's feed the data to input topics. In this page we will generate fake Apache logs, and parse to JSON format, and feed JSON to Kafka topic. </p> |
| |
| <p>Let's create your working directory, since we will clone the project and also setup virtualenv.</p> |
| |
| <p>In your working directory, <code>virtualenv env</code> to setup virtualenv to env directory, and activate.</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ virtualenv env |
| $ source env/bin/activate |
| </code></pre></div> |
| <p>Feel free to <code>deactivate</code> when you're done with example.</p> |
| |
| <h3 id="install-and-modify-fake-apache-log-generator">Install and modify Fake-Apache-Log-Generator</h3> |
| |
| <p><code>Fake-Apache-Log-Generator</code> is not presented to package, and also we need to modify the script.</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git |
| $ cd Fake-Apache-Log-Generator |
| </code></pre></div> |
| <p>Open <code>apache-fake-log-gen.py</code> and replace <code>while (flag):</code> statements to below:</p> |
| <div class="highlight"><pre><code class="language-" data-lang=""> elapsed_us = random.randint(1 * 1000,1000 * 1000) # 1 ms to 1 sec |
| seconds=random.randint(30,300) |
| increment = datetime.timedelta(seconds=seconds) |
| otime += increment |
| |
| ip = faker.ipv4() |
| dt = otime.strftime('%d/%b/%Y:%H:%M:%S') |
| tz = datetime.datetime.now(pytz.timezone('US/Pacific')).strftime('%z') |
| vrb = numpy.random.choice(verb,p=[0.6,0.1,0.1,0.2]) |
| |
| uri = random.choice(resources) |
| if uri.find("apps")>0: |
| uri += `random.randint(1000,10000)` |
| |
| resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04]) |
| byt = int(random.gauss(5000,50)) |
| referer = faker.uri() |
| useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )() |
| f.write('%s - - [%s %s] %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % (ip,dt,tz,elapsed_us,vrb,uri,resp,byt,referer,useragent)) |
| |
| log_lines = log_lines - 1 |
| flag = False if log_lines == 0 else True |
| </code></pre></div> |
| <p>to make sure fake elapsed_us is included to fake log.</p> |
| |
| <p>For convenience, you can skip cloning project and download modified file from here: <a href="https://gist.github.com/HeartSaVioR/79fd4e461604fabecf535ffece47e6c2">apache-fake-log-gen.py (gist)</a></p> |
| |
| <h3 id="install-apache-log-parser-and-write-parsing-script">Install apache-log-parser and write parsing script</h3> |
| |
| <p><code>apache-log-parser</code> can be installed via <code>pip</code>.</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ pip install apache-log-parser |
| </code></pre></div> |
| <p>Since apache-log-parser is a library, in order to parse fake log we need to write small python script. |
| Let's create file <code>parse-fake-log-gen-to-json-with-incrementing-id.py</code> with below content: </p> |
| <div class="highlight"><pre><code class="language-" data-lang="">import sys |
| import apache_log_parser |
| import json |
| |
| auto_incr_id = 1 |
| parser_format = '%a - - %t %D "%r" %s %b "%{Referer}i" "%{User-Agent}i"' |
| line_parser = apache_log_parser.make_parser(parser_format) |
| while True: |
| # we'll use pipe |
| line = sys.stdin.readline() |
| if not line: |
| break |
| parsed_dict = line_parser(line) |
| parsed_dict['id'] = auto_incr_id |
| auto_incr_id += 1 |
| |
| # works only python 2, but I don't care cause it's just a test module :) |
| parsed_dict = {k.upper(): v for k, v in parsed_dict.iteritems() if not k.endswith('datetimeobj')} |
| print json.dumps(parsed_dict) |
| </code></pre></div> |
| <h3 id="feed-parsed-json-apache-log-to-kafka">Feed parsed JSON Apache Log to Kafka</h3> |
| |
| <p>OK! We're prepared to feed the data to Kafka topic. Let's use <code>kafka-console-producer</code> to feed parsed JSON.</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ python apache-fake-log-gen.py -n 0 | python parse-fake-log-gen-to-json-with-incrementing-id.py | kafka-console-producer --broker-list localhost:9092 --topic apache-logs |
| </code></pre></div> |
| <p>and execute below to another terminal session to confirm data is being fed.</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-logs |
| </code></pre></div> |
| <p>If you can see the json like below, it's done:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">{"TIME_US": "757467", "REQUEST_FIRST_LINE": "GET /wp-content HTTP/1.0", "REQUEST_METHOD": "GET", "RESPONSE_BYTES_CLF": "4988", "TIME_RECEIVED_ISOFORMAT": "2021-06-30T22:02:53", "TIME_RECEIVED_TZ_ISOFORMAT": "2021-06-30T22:02:53-07:00", "REQUEST_HTTP_VER": "1.0", "REQUEST_HEADER_USER_AGENT__BROWSER__FAMILY": "Firefox", "REQUEST_HEADER_USER_AGENT__IS_MOBILE": false, "REQUEST_HEADER_USER_AGENT__BROWSER__VERSION_STRING": "3.6.13", "REQUEST_URL_FRAGMENT": "", "REQUEST_HEADER_USER_AGENT": "Mozilla/5.0 (X11; Linux x86_64; rv:1.9.7.20) Gecko/2010-10-13 13:52:34 Firefox/3.6.13", "REQUEST_URL_SCHEME": "", "REQUEST_URL_PATH": "/wp-content", "REQUEST_URL_QUERY_SIMPLE_DICT": {}, "TIME_RECEIVED_UTC_ISOFORMAT": "2021-07-01T05:02:53+00:00", "REQUEST_URL_QUERY_DICT": {}, "STATUS": "200", "REQUEST_URL_NETLOC": "", "REQUEST_URL_QUERY_LIST": [], "REQUEST_URL_QUERY": "", "REQUEST_URL_USERNAME": null, "REQUEST_HEADER_USER_AGENT__OS__VERSION_STRING": "", "REQUEST_URL_HOSTNAME": null, "REQUEST_HEADER_USER_AGENT__OS__FAMILY": "Linux", "REQUEST_URL": "/wp-content", "ID": 904128, "REQUEST_HEADER_REFERER": "http://white.com/terms/", "REQUEST_URL_PORT": null, "REQUEST_URL_PASSWORD": null, "TIME_RECEIVED": "[30/Jun/2021:22:02:53 -0700]", "REMOTE_IP": "88.203.90.62"} |
| </code></pre></div> |
| <h2 id="example-filtering-error-logs">Example: filtering error logs</h2> |
| |
| <p>In this example we'll filter error logs from entire logs and store them to another topics. <code>project</code> and <code>filter</code> features will be used.</p> |
| |
| <p>The content of script file is here:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apache-logs' |
| CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' |
| INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4 |
| </code></pre></div> |
| <p>Save this file to <code>apache_log_error_filtering.sql</code>.</p> |
| |
| <p>Let's take a look at the script.</p> |
| |
| <p>The first statement defines the table <code>APACHE_LOGS</code> which represents the input stream. The <code>LOCATION</code> clause specifies the ZkHost (<code>localhost:2181</code>), the path of the brokers in ZooKeeper (<code>/brokers</code>) and the topic (<code>apache-logs</code>). |
| Note that Kafka data source requires primary key to be defined. That's why we put integer id for parsed JSON data.</p> |
| |
| <p>Similarly, the second statement specifies the table <code>APACHE_ERROR_LOGS</code> which represents the output stream. The <code>TBLPROPERTIES</code> clause specifies the configuration of <a href="http://kafka.apache.org/documentation.html#producerconfigs">KafkaProducer</a> and is required for a Kafka sink table.</p> |
| |
| <p>The last statement defines the topology. Storm SQL only define the topology and run topology on DML statement. |
| DDL statements define input data source, output data source, and user defined function which will be referred by DML statement.</p> |
| |
| <p>Let's look at where statement first. Since we want to filter error logs, we divide status by 100 and compare quotient is equal or greater than 4. (easier representation is <code>>= 400</code>) |
| Since status in JSON is string format (hence represented as VARCHAR for APACHE_LOGS table), we apply CAST(STATUS AS INT) to convert to integer type before applying division. |
| Now we have filtered only error logs. </p> |
| |
| <p>Let's transform some columns to match the output stream. In this statement we apply CAST(STATUS AS INT) to convert to integer type, and divide TIME_US by 1000 to convert microsecond to millisecond.</p> |
| |
| <p>Last, insert statement stores filtered and transformed rows (tuples) to the output stream. </p> |
| |
| <p>To run this example, users need to include the data sources (<code>storm-sql-kafka</code> in this case) and its dependency in the |
| class path. Dependencies for Storm SQL are automatically handled when users run <code>storm sql</code>. |
| Users can include data sources at the submission step like below:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql apache_log_error_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" |
| </code></pre></div> |
| <p>Above command submits the SQL statements to StormSQL. The option of storm sql is <code>storm sql [script file] [topology name]</code>. |
| Users need to modify each artifacts' version if users are using different version of Storm or Kafka.</p> |
| |
| <p>If your statements pass the validation phase, topology will be shown to Storm UI page.</p> |
| |
| <p>You can see the output via console:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-error-logs |
| </code></pre></div> |
| <p>and the output will be similar to:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">{"ID":854643,"REMOTE_IP":"4.227.214.159","REQUEST_URL":"/wp-content","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows 98; Win 9x 4.90; it-IT; rv:1.9.2.20) Gecko/2015-06-03 11:20:16 Firefox/3.6.17","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T19:14:44+00:00","TIME_RECEIVED_TIMESTAMP":1616958884000,"TIME_ELAPSED_MS":274.222} |
| {"ID":854693,"REMOTE_IP":"223.50.249.7","REQUEST_URL":"/apps/cart.jsp?appID=5578","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_6; rv:1.9.2.20) Gecko/2015-11-06 00:20:43 Firefox/3.8","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T21:41:02+00:00","TIME_RECEIVED_TIMESTAMP":1616967662000,"TIME_ELAPSED_MS":716.851} |
| ... |
| </code></pre></div> |
| <p>You can also run Storm SQL runner to see the logical plan via placing <code>--explain</code> to topology name:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" |
| </code></pre></div> |
| <p>and the output will be similar to:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">LogicalTableModify(table=[[APACHE_ERROR_LOGS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8 |
| LogicalProject(ID=[$0], REMOTE_IP=[$1], REQUEST_URL=[$2], REQUEST_METHOD=[$3], STATUS=[CAST($4):INTEGER NOT NULL], REQUEST_HEADER_USER_AGENT=[$5], TIME_RECEIVED_UTC_ISOFORMAT=[$6], TIME_ELAPSED_MS=[/($7, 1000)]), id = 7 |
| LogicalFilter(condition=[>=(/(CAST($4):INTEGER NOT NULL, 100), 4)]), id = 6 |
| EnumerableTableScan(table=[[APACHE_LOGS]]), id = 5 |
| </code></pre></div> |
| <p>It might be not same as you are seeing if Storm SQL applies query optimizations.</p> |
| |
| <p>We're executing the first Storm SQL topology! Please kill the topology when you see enough output and the logs.</p> |
| |
| <p>To be concise, we'll skip explaining the things we've already seen.</p> |
| |
| <h2 id="example-filtering-slow-logs">Example: filtering slow logs</h2> |
| |
| <p>In this example we'll filter slow logs from entire logs and store them to another topics. <code>project</code> and <code>filter</code>, and <code>User Defined Function (UDF)</code> features will be used. |
| This is very similar to <code>filtering error logs</code> but we'll see how to define <code>User Defined Function (UDF)</code>.</p> |
| |
| <p>The content of script file is here:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apachelogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' |
| CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_RECEIVED_TIMESTAMP BIGINT, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apacheslowlogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' |
| CREATE FUNCTION GET_TIME AS 'org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2' |
| INSERT INTO APACHE_SLOW_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ') AS TIME_RECEIVED_TIMESTAMP, TIME_US / 1000 AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (TIME_US / 1000) >= 100 |
| </code></pre></div> |
| <p>Save this file to <code>apache_log_slow_filtering.sql</code>.</p> |
| |
| <p>We can skip the first 2 statements since it's almost same to the last example.</p> |
| |
| <p>The third statement defines the <code>User defined function</code>. We're defining <code>GET_TIME</code> which uses <code>org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2</code> class.</p> |
| |
| <p>The implementation of GetTime2 is here:</p> |
| <div class="highlight"><pre><code class="language-" data-lang=""><span class="k">package</span> <span class="n">org</span><span class="p">.</span><span class="n">apache</span><span class="p">.</span><span class="n">storm</span><span class="p">.</span><span class="n">sql</span><span class="p">.</span><span class="n">runtime</span><span class="p">.</span><span class="n">functions</span><span class="p">.</span><span class="n">scalar</span><span class="p">.</span><span class="n">datetime</span><span class="p">;</span> |
| |
| <span class="n">import</span> <span class="n">org</span><span class="p">.</span><span class="n">joda</span><span class="p">.</span><span class="n">time</span><span class="p">.</span><span class="n">format</span><span class="p">.</span><span class="n">DateTimeFormat</span><span class="p">;</span> |
| <span class="n">import</span> <span class="n">org</span><span class="p">.</span><span class="n">joda</span><span class="p">.</span><span class="n">time</span><span class="p">.</span><span class="n">format</span><span class="p">.</span><span class="n">DateTimeFormatter</span><span class="p">;</span> |
| |
| <span class="k">public</span> <span class="n">class</span> <span class="n">GetTime2</span> <span class="p">{</span> |
| <span class="k">public</span> <span class="n">static</span> <span class="n">Long</span> <span class="n">evaluate</span><span class="p">(</span><span class="k">String</span> <span class="n">dateString</span><span class="p">,</span> <span class="k">String</span> <span class="n">dateFormat</span><span class="p">)</span> <span class="p">{</span> |
| <span class="n">try</span> <span class="p">{</span> |
| <span class="n">DateTimeFormatter</span> <span class="n">df</span> <span class="p">=</span> <span class="n">DateTimeFormat</span><span class="p">.</span><span class="n">forPattern</span><span class="p">(</span><span class="n">dateFormat</span><span class="p">).</span><span class="n">withZoneUTC</span><span class="p">();</span> |
| <span class="n">return</span> <span class="n">df</span><span class="p">.</span><span class="n">parseDateTime</span><span class="p">(</span><span class="n">dateString</span><span class="p">).</span><span class="n">getMillis</span><span class="p">();</span> |
| <span class="p">}</span> <span class="n">catch</span> <span class="p">(</span><span class="n">Exception</span> <span class="n">ex</span><span class="p">)</span> <span class="p">{</span> |
| <span class="n">throw</span> <span class="n">new</span> <span class="n">RuntimeException</span><span class="p">(</span><span class="n">ex</span><span class="p">);</span> |
| <span class="p">}</span> |
| <span class="p">}</span> |
| <span class="p">}</span> |
| </code></pre></div> |
| <p>This class can be used for UDF since it defines static <code>evaluate</code> method. The SQL type of parameters and return are determined by Calcite which Storm SQL depends on. </p> |
| |
| <p>Note that this class should be in classpath, so in order to define UDF, you need to create jar file which contains UDF classes and run <code>storm sql</code> with <code>--jar</code> option. |
| This page assumes that GetTime2 is in classpath, for simplicity.</p> |
| |
| <p>The last statement is very similar to filtering error logs. The only new thing is that we call <code>GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ')</code> to convert string time to unix timestamp (BIGINT).</p> |
| |
| <p>Let's execute it.</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql apache_log_slow_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" |
| </code></pre></div> |
| <p>You can see the output via console:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-slow-logs |
| </code></pre></div> |
| <p>and the output will be similar to:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">{"ID":890502,"REMOTE_IP":"136.156.159.160","REQUEST_URL":"/list","REQUEST_METHOD":"GET","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows NT 5.01) AppleWebKit/5311 (KHTML, like Gecko) Chrome/13.0.860.0 Safari/5311","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T03:44:59+00:00","TIME_RECEIVED_TIMESTAMP":1622864699000,"TIME_ELAPSED_MS":638.579} |
| {"ID":890542,"REMOTE_IP":"105.146.3.190","REQUEST_URL":"/search/tag/list","REQUEST_METHOD":"DELETE","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (X11; Linux i686) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.891.0 Safari/5332","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T05:54:27+00:00","TIME_RECEIVED_TIMESTAMP":1622872467000,"TIME_ELAPSED_MS":403.957} |
| ... |
| </code></pre></div> |
| <p>That's it! Supposing we have UDF which queries geo location via remote ip, we can filter via geo location, or enrich geo location to transformed result.</p> |
| |
| <h2 id="summary">Summary</h2> |
| |
| <p>We looked through several simple use cases for Storm SQL to learn Storm SQL features. If you haven't looked at <a href="storm-sql.html">Storm SQL integration</a> and <a href="storm-sql-reference.html">Storm SQL language</a>, you need to read it to see full supported features. </p> |
| |
| <p>Note that Storm SQL is running on Trident, which is micro-batch, and also no strong typed. Sink doesn't actually check the type. |
| (You may noticed that the types of some of output fields are different than output table schema.)</p> |
| |
| <p>Its behavior is subject to change when Storm SQL changes its backend API to core (tuple by tuple, low-level or high-level) one.</p> |
| </div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Apache Storm</h5> |
| <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/Rationale.html">Rationale</a></li> |
| <li><a href="/releases/current/Tutorial.html">Tutorial</a></li> |
| <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/index.html">Index</a></li> |
| <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li> |
| <li><a href="/releases/current/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2022 <a href="https://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |