blob: 6ac4790778f930b165aba243c7a2b7c058504aaf [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link href='images/favicon.ico' rel='shortcut icon' type='image/x-icon'>
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>CarbonData</title>
<style>
</style>
<!-- Bootstrap -->
<link rel="stylesheet" href="css/bootstrap.min.css">
<link href="css/style.css" rel="stylesheet">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.scom/respond/1.4.2/respond.min.js"></script>
<![endif]-->
<script src="js/jquery.min.js"></script>
<script src="js/bootstrap.min.js"></script>
<script defer src="https://use.fontawesome.com/releases/v5.0.8/js/all.js"></script>
</head>
<body>
<header>
<nav class="navbar navbar-default navbar-custom cd-navbar-wrapper">
<div class="container">
<div class="navbar-header">
<button aria-controls="navbar" aria-expanded="false" data-target="#navbar" data-toggle="collapse"
class="navbar-toggle collapsed" type="button">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a href="index.html" class="logo">
<img src="images/CarbonDataLogo.png" alt="CarbonData logo" title="CarbocnData logo"/>
</a>
</div>
<div class="navbar-collapse collapse cd_navcontnt" id="navbar">
<ul class="nav navbar-nav navbar-right navlist-custom">
<li><a href="index.html" class="hidden-xs"><i class="fa fa-home" aria-hidden="true"></i> </a>
</li>
<li><a href="index.html" class="hidden-lg hidden-md hidden-sm">Home</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle " data-toggle="dropdown" role="button" aria-haspopup="true"
aria-expanded="false"> Download <span class="caret"></span></a>
<ul class="dropdown-menu">
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.2.0/"
target="_blank">Apache CarbonData 2.2.0</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.1.1/"
target="_blank">Apache CarbonData 2.1.1</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.1.0/"
target="_blank">Apache CarbonData 2.1.0</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.0.1/"
target="_blank">Apache CarbonData 2.0.1</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.0.0/"
target="_blank">Apache CarbonData 2.0.0</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.6.1/"
target="_blank">Apache CarbonData 1.6.1</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.6.0/"
target="_blank">Apache CarbonData 1.6.0</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.4/"
target="_blank">Apache CarbonData 1.5.4</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.3/"
target="_blank">Apache CarbonData 1.5.3</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.2/"
target="_blank">Apache CarbonData 1.5.2</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.1/"
target="_blank">Apache CarbonData 1.5.1</a></li>
<li>
<a href="https://cwiki.apache.org/confluence/display/CARBONDATA/Releases"
target="_blank">Release Archive</a></li>
</ul>
</li>
<li><a href="documentation.html" class="active">Documentation</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true"
aria-expanded="false">Community <span class="caret"></span></a>
<ul class="dropdown-menu">
<li>
<a href="https://github.com/apache/carbondata/blob/master/docs/how-to-contribute-to-apache-carbondata.md"
target="_blank">Contributing to CarbonData</a></li>
<li>
<a href="https://github.com/apache/carbondata/blob/master/docs/release-guide.md"
target="_blank">Release Guide</a></li>
<li>
<a href="https://cwiki.apache.org/confluence/display/CARBONDATA/PMC+and+Committers+member+list"
target="_blank">Project PMC and Committers</a></li>
<li>
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66850609"
target="_blank">CarbonData Meetups</a></li>
<li><a href="security.html">Apache CarbonData Security</a></li>
<li><a href="https://issues.apache.org/jira/browse/CARBONDATA" target="_blank">Apache
Jira</a></li>
<li><a href="videogallery.html">CarbonData Videos </a></li>
</ul>
</li>
<li class="dropdown">
<a href="http://www.apache.org/" class="apache_link hidden-xs dropdown-toggle"
data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Apache</a>
<ul class="dropdown-menu">
<li><a href="http://www.apache.org/" target="_blank">Apache Homepage</a></li>
<li><a href="http://www.apache.org/licenses/" target="_blank">License</a></li>
<li><a href="http://www.apache.org/foundation/sponsorship.html"
target="_blank">Sponsorship</a></li>
<li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
</ul>
</li>
<li class="dropdown">
<a href="http://www.apache.org/" class="hidden-lg hidden-md hidden-sm dropdown-toggle"
data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Apache</a>
<ul class="dropdown-menu">
<li><a href="http://www.apache.org/" target="_blank">Apache Homepage</a></li>
<li><a href="http://www.apache.org/licenses/" target="_blank">License</a></li>
<li><a href="http://www.apache.org/foundation/sponsorship.html"
target="_blank">Sponsorship</a></li>
<li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
</ul>
</li>
<li>
<a href="#" id="search-icon"><i class="fa fa-search" aria-hidden="true"></i></a>
</li>
</ul>
</div><!--/.nav-collapse -->
<div id="search-box">
<form method="get" action="http://www.google.com/search" target="_blank">
<div class="search-block">
<table border="0" cellpadding="0" width="100%">
<tr>
<td style="width:80%">
<input type="text" name="q" size=" 5" maxlength="255" value=""
class="search-input" placeholder="Search...." required/>
</td>
<td style="width:20%">
<input type="submit" value="Search"/></td>
</tr>
<tr>
<td align="left" style="font-size:75%" colspan="2">
<input type="checkbox" name="sitesearch" value="carbondata.apache.org" checked/>
<span style=" position: relative; top: -3px;"> Only search for CarbonData</span>
</td>
</tr>
</table>
</div>
</form>
</div>
</div>
</nav>
</header> <!-- end Header part -->
<div class="fixed-padding"></div> <!-- top padding with fixde header -->
<section><!-- Dashboard nav -->
<div class="container-fluid q">
<div class="col-sm-12 col-md-12 maindashboard">
<div class="verticalnavbar">
<nav class="b-sticky-nav">
<div class="nav-scroller">
<div class="nav__inner">
<a class="b-nav__intro nav__item" href="./introduction.html">introduction</a>
<a class="b-nav__quickstart nav__item" href="./quick-start-guide.html">quick start</a>
<a class="b-nav__uses nav__item" href="./usecases.html">use cases</a>
<div class="nav__item nav__item__with__subs">
<a class="b-nav__docs nav__item nav__sub__anchor" href="./language-manual.html">Language Reference</a>
<a class="nav__item nav__sub__item" href="./ddl-of-carbondata.html">DDL</a>
<a class="nav__item nav__sub__item" href="./dml-of-carbondata.html">DML</a>
<a class="nav__item nav__sub__item" href="./streaming-guide.html">Streaming</a>
<a class="nav__item nav__sub__item" href="./configuration-parameters.html">Configuration</a>
<a class="nav__item nav__sub__item" href="./index-developer-guide.html">Indexes</a>
<a class="nav__item nav__sub__item" href="./supported-data-types-in-carbondata.html">Data Types</a>
</div>
<div class="nav__item nav__item__with__subs">
<a class="b-nav__datamap nav__item nav__sub__anchor" href="./index-management.html">Index Managament</a>
<a class="nav__item nav__sub__item" href="./bloomfilter-index-guide.html">Bloom Filter</a>
<a class="nav__item nav__sub__item" href="./lucene-index-guide.html">Lucene</a>
<a class="nav__item nav__sub__item" href="./secondary-index-guide.html">Secondary Index</a>
<a class="nav__item nav__sub__item" href="../spatial-index-guide.html">Spatial Index</a>
<a class="nav__item nav__sub__item" href="../mv-guide.html">MV</a>
</div>
<div class="nav__item nav__item__with__subs">
<a class="b-nav__api nav__item nav__sub__anchor" href="./sdk-guide.html">API</a>
<a class="nav__item nav__sub__item" href="./sdk-guide.html">Java SDK</a>
<a class="nav__item nav__sub__item" href="./csdk-guide.html">C++ SDK</a>
</div>
<a class="b-nav__perf nav__item" href="./performance-tuning.html">Performance Tuning</a>
<a class="b-nav__s3 nav__item" href="./s3-guide.html">S3 Storage</a>
<a class="b-nav__indexserver nav__item" href="./index-server.html">Index Server</a>
<a class="b-nav__prestodb nav__item" href="./prestodb-guide.html">PrestoDB Integration</a>
<a class="b-nav__prestosql nav__item" href="./prestosql-guide.html">PrestoSQL Integration</a>
<a class="b-nav__flink nav__item" href="./flink-integration-guide.html">Flink Integration</a>
<a class="b-nav__scd nav__item" href="./scd-and-cdc-guide.html">SCD & CDC</a>
<a class="b-nav__faq nav__item" href="./faq.html">FAQ</a>
<a class="b-nav__contri nav__item" href="./how-to-contribute-to-apache-carbondata.html">Contribute</a>
<a class="b-nav__security nav__item" href="./security.html">Security</a>
<a class="b-nav__release nav__item" href="./release-guide.html">Release Guide</a>
</div>
</div>
<div class="navindicator">
<div class="b-nav__intro navindicator__item"></div>
<div class="b-nav__quickstart navindicator__item"></div>
<div class="b-nav__uses navindicator__item"></div>
<div class="b-nav__docs navindicator__item"></div>
<div class="b-nav__datamap navindicator__item"></div>
<div class="b-nav__api navindicator__item"></div>
<div class="b-nav__perf navindicator__item"></div>
<div class="b-nav__s3 navindicator__item"></div>
<div class="b-nav__indexserver navindicator__item"></div>
<div class="b-nav__prestodb navindicator__item"></div>
<div class="b-nav__prestosql navindicator__item"></div>
<div class="b-nav__flink navindicator__item"></div>
<div class="b-nav__scd navindicator__item"></div>
<div class="b-nav__faq navindicator__item"></div>
<div class="b-nav__contri navindicator__item"></div>
<div class="b-nav__security navindicator__item"></div>
</div>
</nav>
</div>
<div class="mdcontent">
<section>
<div style="padding:10px 15px;">
<div id="viewpage" name="viewpage">
<div class="row">
<div class="col-sm-12 col-md-12">
<div>
<h1>
<a id="carbondata-streaming-ingestion" class="anchor" href="#carbondata-streaming-ingestion" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>CarbonData Streaming Ingestion</h1>
<ul>
<li>
<a href="#quick-example">Streaming Table Management</a>
<ul>
<li><a href="#create-table-with-streaming-property">Create table with streaming property</a></li>
<li><a href="#alter-streaming-property">Alter streaming property</a></li>
<li><a href="#acquire-streaming-lock">Acquire streaming lock</a></li>
<li><a href="#create-streaming-segment">Create streaming segment</a></li>
<li><a href="#change-segment-status">Change Stream segment status</a></li>
<li><a href="#handoff-streaming-finish-segment-to-columnar-segment">Handoff "streaming finish" segment to columnar segment</a></li>
<li><a href="#auto-handoff-streaming-segment">Auto handoff streaming segment</a></li>
<li><a href="#stream-data-parser">Stream data parser</a></li>
<li><a href="#close-streaming-table">Close streaming table</a></li>
<li><a href="#constraint">Constraints</a></li>
</ul>
</li>
<li>
<a href="#streamsql">StreamSQL</a>
<ul>
<li><a href="#streaming-table">Defining Streaming Table</a></li>
<li>
<a href="#streaming-job-management">Streaming Job Management</a>
<ul>
<li><a href="#create-stream">CREATE STREAM</a></li>
<li><a href="#drop-stream">DROP STREAM</a></li>
<li><a href="#show-streams">SHOW STREAMS</a></li>
<li><a href="#close-stream">CLOSE STREAM</a></li>
</ul>
</li>
</ul>
</li>
</ul>
<h2>
<a id="quick-example" class="anchor" href="#quick-example" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Quick example</h2>
<p>Download and unzip spark-2.4.5-bin-hadoop2.7.tgz, and export $SPARK_HOME</p>
<p>Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-2.0.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars</p>
<div class="highlight highlight-source-shell"><pre>mvn clean package -DskipTests -Pspark-2.4</pre></div>
<p>Start a socket data server in a terminal</p>
<div class="highlight highlight-source-shell"><pre>nc -lk 9099</pre></div>
<p>type some CSV rows as following</p>
<pre lang="csv"><code>1,col1
2,col2
3,col3
4,col4
5,col5
</code></pre>
<p>Start spark-shell in new terminal, type :paste, then copy and run the following code.</p>
<div class="highlight highlight-source-scala"><pre> <span class="pl-k">import</span> <span class="pl-en">java</span>.<span class="pl-en">io</span>.<span class="pl-en">File</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">spark</span>.<span class="pl-en">sql</span>.{<span class="pl-en">CarbonEnv</span>, <span class="pl-en">SparkSession</span>}
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">spark</span>.<span class="pl-en">sql</span>.<span class="pl-en">CarbonSession</span>.<span class="pl-en">_</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">spark</span>.<span class="pl-en">sql</span>.<span class="pl-en">streaming</span>.{<span class="pl-en">ProcessingTime</span>, <span class="pl-en">StreamingQuery</span>}
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbondata</span>.<span class="pl-en">core</span>.<span class="pl-en">util</span>.<span class="pl-en">path</span>.<span class="pl-en">CarbonTablePath</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbondata</span>.<span class="pl-en">streaming</span>.<span class="pl-en">parser</span>.<span class="pl-en">CarbonStreamParser</span>
<span class="pl-k">val</span> <span class="pl-smi">warehouse</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">File</span>(<span class="pl-s"><span class="pl-pds">"</span>./warehouse<span class="pl-pds">"</span></span>).getCanonicalPath
<span class="pl-k">val</span> <span class="pl-smi">metastore</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">File</span>(<span class="pl-s"><span class="pl-pds">"</span>./metastore<span class="pl-pds">"</span></span>).getCanonicalPath
<span class="pl-k">val</span> <span class="pl-smi">spark</span> <span class="pl-k">=</span> <span class="pl-en">SparkSession</span>
.builder()
.master(<span class="pl-s"><span class="pl-pds">"</span>local<span class="pl-pds">"</span></span>)
.appName(<span class="pl-s"><span class="pl-pds">"</span>StreamExample<span class="pl-pds">"</span></span>)
.config(<span class="pl-s"><span class="pl-pds">"</span>spark.sql.warehouse.dir<span class="pl-pds">"</span></span>, warehouse)
.getOrCreateCarbonSession(warehouse, metastore)
spark.sparkContext.setLogLevel(<span class="pl-s"><span class="pl-pds">"</span>ERROR<span class="pl-pds">"</span></span>)
<span class="pl-c"><span class="pl-c">//</span> drop table if exists previously</span>
spark.sql(<span class="pl-k">s</span><span class="pl-s">"</span><span class="pl-s">DROP TABLE IF EXISTS carbon_table</span><span class="pl-s">"</span>)
<span class="pl-c"><span class="pl-c">//</span> Create target carbon table and populate with initial data</span>
spark.sql(
<span class="pl-k">s</span><span class="pl-s">"""</span>
<span class="pl-s"> | CREATE TABLE carbon_table (</span>
<span class="pl-s"> | col1 INT,</span>
<span class="pl-s"> | col2 STRING</span>
<span class="pl-s"> | )</span>
<span class="pl-s"> | STORED AS carbondata</span>
<span class="pl-s"> | TBLPROPERTIES('streaming'='true')</span><span class="pl-s">"""</span>.stripMargin)
<span class="pl-k">val</span> <span class="pl-smi">carbonTable</span> <span class="pl-k">=</span> <span class="pl-en">CarbonEnv</span>.getCarbonTable(<span class="pl-en">Some</span>(<span class="pl-s"><span class="pl-pds">"</span>default<span class="pl-pds">"</span></span>), <span class="pl-s"><span class="pl-pds">"</span>carbon_table<span class="pl-pds">"</span></span>)(spark)
<span class="pl-k">val</span> <span class="pl-smi">tablePath</span> <span class="pl-k">=</span> carbonTable.getTablePath
<span class="pl-c"><span class="pl-c">//</span> batch load</span>
<span class="pl-k">var</span> <span class="pl-smi">qry</span><span class="pl-k">:</span> <span class="pl-en">StreamingQuery</span> <span class="pl-k">=</span> <span class="pl-c1">null</span>
<span class="pl-k">val</span> <span class="pl-smi">readSocketDF</span> <span class="pl-k">=</span> spark.readStream
.format(<span class="pl-s"><span class="pl-pds">"</span>socket<span class="pl-pds">"</span></span>)
.option(<span class="pl-s"><span class="pl-pds">"</span>host<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>localhost<span class="pl-pds">"</span></span>)
.option(<span class="pl-s"><span class="pl-pds">"</span>port<span class="pl-pds">"</span></span>, <span class="pl-c1">9099</span>)
.load()
<span class="pl-c"><span class="pl-c">//</span> Write data from socket stream to carbondata file</span>
qry <span class="pl-k">=</span> readSocketDF.writeStream
.format(<span class="pl-s"><span class="pl-pds">"</span>carbondata<span class="pl-pds">"</span></span>)
.trigger(<span class="pl-en">ProcessingTime</span>(<span class="pl-s"><span class="pl-pds">"</span>5 seconds<span class="pl-pds">"</span></span>))
.option(<span class="pl-s"><span class="pl-pds">"</span>checkpointLocation<span class="pl-pds">"</span></span>, <span class="pl-en">CarbonTablePath</span>.getStreamingCheckpointDir(tablePath))
.option(<span class="pl-s"><span class="pl-pds">"</span>dbName<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>default<span class="pl-pds">"</span></span>)
.option(<span class="pl-s"><span class="pl-pds">"</span>tableName<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>carbon_table<span class="pl-pds">"</span></span>)
.option(<span class="pl-en">CarbonStreamParser</span>.<span class="pl-en">CARBON_STREAM_PARSER</span>,
<span class="pl-en">CarbonStreamParser</span>.<span class="pl-en">CARBON_STREAM_PARSER_CSV</span>)
.start()
<span class="pl-c"><span class="pl-c">//</span> start new thread to show data</span>
<span class="pl-k">new</span> <span class="pl-en">Thread</span>() {
<span class="pl-k">override</span> <span class="pl-k">def</span> <span class="pl-en">run</span>()<span class="pl-k">:</span> <span class="pl-en">Unit</span> <span class="pl-k">=</span> {
<span class="pl-k">do</span> {
spark.sql(<span class="pl-s"><span class="pl-pds">"</span>select * from carbon_table<span class="pl-pds">"</span></span>).show(<span class="pl-c1">false</span>)
<span class="pl-en">Thread</span>.sleep(<span class="pl-c1">10000</span>)
} <span class="pl-k">while</span> (<span class="pl-c1">true</span>)
}
}.start()
qry.awaitTermination()</pre></div>
<p>Continue to type some rows into data server, and spark-shell will show the new data of the table.</p>
<h2>
<a id="create-table-with-streaming-property" class="anchor" href="#create-table-with-streaming-property" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Create table with streaming property</h2>
<p>Streaming table is just a normal carbon table with "streaming" table property, user can create
streaming table using following DDL.</p>
<div class="highlight highlight-source-sql"><pre><span class="pl-k">CREATE</span> <span class="pl-k">TABLE</span> <span class="pl-en">streaming_table</span> (
col1 <span class="pl-k">INT</span>,
col2 STRING
)
STORED <span class="pl-k">AS</span> carbondata
TBLPROPERTIES(<span class="pl-s"><span class="pl-pds">'</span>streaming<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>true<span class="pl-pds">'</span></span>)</pre></div>
<table>
<thead>
<tr>
<th>property name</th>
<th>default</th>
<th>description</th>
</tr>
</thead>
<tbody>
<tr>
<td>streaming</td>
<td>false</td>
<td>Whether to enable streaming ingest feature for this table <br> Value range: true, false</td>
</tr>
</tbody>
</table>
<p>"DESC FORMATTED" command will show streaming property.</p>
<div class="highlight highlight-source-sql"><pre><span class="pl-k">DESC</span> FORMATTED streaming_table</pre></div>
<h2>
<a id="alter-streaming-property" class="anchor" href="#alter-streaming-property" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Alter streaming property</h2>
<p>For an old table, use ALTER TABLE command to set the streaming property.</p>
<div class="highlight highlight-source-sql"><pre><span class="pl-k">ALTER</span> <span class="pl-k">TABLE</span> streaming_table <span class="pl-k">SET</span> TBLPROPERTIES(<span class="pl-s"><span class="pl-pds">'</span>streaming<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>true<span class="pl-pds">'</span></span>)</pre></div>
<h2>
<a id="acquire-streaming-lock" class="anchor" href="#acquire-streaming-lock" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Acquire streaming lock</h2>
<p>At the begin of streaming ingestion, the system will try to acquire the table level lock of streaming.lock file. If the system isn't able to acquire the lock of this table, it will throw an InterruptedException.</p>
<h2>
<a id="create-streaming-segment" class="anchor" href="#create-streaming-segment" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Create streaming segment</h2>
<p>The streaming data will be ingested into a separate segment of carbondata table, this segment is termed as streaming segment. The status of this segment will be recorded as "streaming" in "tablestatus" file along with its data size. You can use "SHOW SEGMENTS FOR TABLE tableName" to check segment status.</p>
<p>After the streaming segment reaches the max size, CarbonData will change the segment status to "streaming finish" from "streaming", and create new "streaming" segment to continue to ingest streaming data.</p>
<table>
<thead>
<tr>
<th>option</th>
<th>default</th>
<th>description</th>
</tr>
</thead>
<tbody>
<tr>
<td>carbon.streaming.segment.max.size</td>
<td>1024000000</td>
<td>Unit: byte <br>max size of streaming segment</td>
</tr>
</tbody>
</table>
<table>
<thead>
<tr>
<th>segment status</th>
<th>description</th>
</tr>
</thead>
<tbody>
<tr>
<td>streaming</td>
<td>The segment is running streaming ingestion</td>
</tr>
<tr>
<td>streaming finish</td>
<td>The segment already finished streaming ingestion, <br> it will be handed off to a segment in the columnar format</td>
</tr>
</tbody>
</table>
<h2>
<a id="change-segment-status" class="anchor" href="#change-segment-status" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Change segment status</h2>
<p>Use below command to change the status of "streaming" segment to "streaming finish" segment. If the streaming application is running, this command will be blocked.</p>
<div class="highlight highlight-source-sql"><pre><span class="pl-k">ALTER</span> <span class="pl-k">TABLE</span> streaming_table FINISH STREAMING</pre></div>
<h2>
<a id="handoff-streaming-finish-segment-to-columnar-segment" class="anchor" href="#handoff-streaming-finish-segment-to-columnar-segment" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Handoff "streaming finish" segment to columnar segment</h2>
<p>Use below command to handoff "streaming finish" segment to columnar format segment manually.</p>
<div class="highlight highlight-source-sql"><pre><span class="pl-k">ALTER</span> <span class="pl-k">TABLE</span> streaming_table COMPACT <span class="pl-s"><span class="pl-pds">'</span>streaming<span class="pl-pds">'</span></span>
</pre></div>
<h2>
<a id="auto-handoff-streaming-segment" class="anchor" href="#auto-handoff-streaming-segment" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Auto handoff streaming segment</h2>
<p>Config the property "carbon.streaming.auto.handoff.enabled" to auto handoff streaming segment. If the value of this property is true, after the streaming segment reaches the max size, CarbonData will change this segment to "streaming finish" status and trigger to auto handoff this segment to columnar format segment in a new thread.</p>
<table>
<thead>
<tr>
<th>property name</th>
<th>default</th>
<th>description</th>
</tr>
</thead>
<tbody>
<tr>
<td>carbon.streaming.auto.handoff.enabled</td>
<td>true</td>
<td>whether to auto trigger handoff operation</td>
</tr>
</tbody>
</table>
<h2>
<a id="stream-data-parser" class="anchor" href="#stream-data-parser" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Stream data parser</h2>
<p>Config the property "carbon.stream.parser" to define a stream parser to convert InternalRow to Object[] when write stream data.</p>
<table>
<thead>
<tr>
<th>property name</th>
<th>default</th>
<th>description</th>
</tr>
</thead>
<tbody>
<tr>
<td>carbon.stream.parser</td>
<td>org.apache.carbondata.streaming.parser.RowStreamParserImp</td>
<td>the class of the stream parser</td>
</tr>
</tbody>
</table>
<p>Currently CarbonData support two parsers, as following:</p>
<p><strong>1. org.apache.carbondata.streaming.parser.CSVStreamParserImp</strong>: This parser gets a line data(String type) from the first index of InternalRow and converts this String to Object[].</p>
<p><strong>2. org.apache.carbondata.streaming.parser.RowStreamParserImp</strong>: This is the default stream parser, it will auto convert InternalRow to Object[] according to schema of this <code>DataSet</code>, for example:</p>
<div class="highlight highlight-source-scala"><pre> <span class="pl-k">case</span> <span class="pl-k">class</span> <span class="pl-en">FileElement</span>(<span class="pl-v">school</span>: <span class="pl-en">Array</span>[<span class="pl-en">String</span>], <span class="pl-v">age</span>: <span class="pl-en">Int</span>)
<span class="pl-k">case</span> <span class="pl-k">class</span> <span class="pl-en">StreamData</span>(<span class="pl-v">id</span>: <span class="pl-en">Int</span>, <span class="pl-v">name</span>: <span class="pl-en">String</span>, <span class="pl-v">city</span>: <span class="pl-en">String</span>, <span class="pl-v">salary</span>: <span class="pl-en">Float</span>, <span class="pl-v">file</span>: <span class="pl-en">FileElement</span>)
...
<span class="pl-k">var</span> <span class="pl-smi">qry</span><span class="pl-k">:</span> <span class="pl-en">StreamingQuery</span> <span class="pl-k">=</span> <span class="pl-c1">null</span>
<span class="pl-k">val</span> <span class="pl-smi">readSocketDF</span> <span class="pl-k">=</span> spark.readStream
.format(<span class="pl-s"><span class="pl-pds">"</span>socket<span class="pl-pds">"</span></span>)
.option(<span class="pl-s"><span class="pl-pds">"</span>host<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>localhost<span class="pl-pds">"</span></span>)
.option(<span class="pl-s"><span class="pl-pds">"</span>port<span class="pl-pds">"</span></span>, <span class="pl-c1">9099</span>)
.load()
.as[<span class="pl-en">String</span>]
.map(_.split(<span class="pl-s"><span class="pl-pds">"</span>,<span class="pl-pds">"</span></span>))
.map { fields <span class="pl-k">=&gt;</span> {
<span class="pl-k">val</span> <span class="pl-smi">tmp</span> <span class="pl-k">=</span> fields(<span class="pl-c1">4</span>).split(<span class="pl-s"><span class="pl-pds">"</span><span class="pl-cce">\\</span>$<span class="pl-pds">"</span></span>)
<span class="pl-k">val</span> <span class="pl-smi">file</span> <span class="pl-k">=</span> <span class="pl-en">FileElement</span>(tmp(<span class="pl-c1">0</span>).split(<span class="pl-s"><span class="pl-pds">"</span>:<span class="pl-pds">"</span></span>), tmp(<span class="pl-c1">1</span>).toInt)
<span class="pl-en">StreamData</span>(fields(<span class="pl-c1">0</span>).toInt, fields(<span class="pl-c1">1</span>), fields(<span class="pl-c1">2</span>), fields(<span class="pl-c1">3</span>).toFloat, file)
} }
<span class="pl-c"><span class="pl-c">//</span> Write data from socket stream to carbondata file</span>
qry <span class="pl-k">=</span> readSocketDF.writeStream
.format(<span class="pl-s"><span class="pl-pds">"</span>carbondata<span class="pl-pds">"</span></span>)
.trigger(<span class="pl-en">ProcessingTime</span>(<span class="pl-s"><span class="pl-pds">"</span>5 seconds<span class="pl-pds">"</span></span>))
.option(<span class="pl-s"><span class="pl-pds">"</span>checkpointLocation<span class="pl-pds">"</span></span>, tablePath.getStreamingCheckpointDir)
.option(<span class="pl-s"><span class="pl-pds">"</span>dbName<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>default<span class="pl-pds">"</span></span>)
.option(<span class="pl-s"><span class="pl-pds">"</span>tableName<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>carbon_table<span class="pl-pds">"</span></span>)
.start()
...</pre></div>
<h3>
<a id="how-to-implement-a-customized-stream-parser" class="anchor" href="#how-to-implement-a-customized-stream-parser" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>How to implement a customized stream parser</h3>
<p>If user needs to implement a customized stream parser to convert a specific InternalRow to Object[], it needs to implement <code>initialize</code> method and <code>parserRow</code> method of interface <code>CarbonStreamParser</code>, for example:</p>
<div class="highlight highlight-source-scala"><pre> <span class="pl-k">package</span> <span class="pl-en">org</span>.<span class="pl-en">XXX</span>.<span class="pl-en">XXX</span>.<span class="pl-en">streaming</span>.<span class="pl-en">parser</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">hadoop</span>.<span class="pl-en">conf</span>.<span class="pl-en">Configuration</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">spark</span>.<span class="pl-en">sql</span>.<span class="pl-en">catalyst</span>.<span class="pl-en">InternalRow</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">spark</span>.<span class="pl-en">sql</span>.<span class="pl-en">types</span>.<span class="pl-en">StructType</span>
<span class="pl-k">class</span> <span class="pl-en">XXXStreamParserImp</span> <span class="pl-k">extends</span> <span class="pl-e">CarbonStreamParser</span> {
<span class="pl-k">override</span> <span class="pl-k">def</span> <span class="pl-en">initialize</span>(<span class="pl-v">configuration</span>: <span class="pl-en">Configuration</span>, <span class="pl-v">structType</span>: <span class="pl-en">StructType</span>)<span class="pl-k">:</span> <span class="pl-en">Unit</span> <span class="pl-k">=</span> {
<span class="pl-c"><span class="pl-c">//</span> user can get the properties from "configuration"</span>
}
<span class="pl-k">override</span> <span class="pl-k">def</span> <span class="pl-en">parserRow</span>(<span class="pl-v">value</span>: <span class="pl-en">InternalRow</span>)<span class="pl-k">:</span> <span class="pl-en">Array</span>[<span class="pl-en">Object</span>] <span class="pl-k">=</span> {
<span class="pl-c"><span class="pl-c">//</span> convert InternalRow to Object[](Array[Object] in Scala) </span>
}
<span class="pl-k">override</span> <span class="pl-k">def</span> <span class="pl-en">close</span>()<span class="pl-k">:</span> <span class="pl-en">Unit</span> <span class="pl-k">=</span> {
}
}
</pre></div>
<p>and then set the property "carbon.stream.parser" to "org.XXX.XXX.streaming.parser.XXXStreamParserImp".</p>
<h2>
<a id="close-streaming-table" class="anchor" href="#close-streaming-table" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Close streaming table</h2>
<p>Use below command to handoff all streaming segments to columnar format segments and modify the streaming property to false, this table becomes a normal table.</p>
<div class="highlight highlight-source-sql"><pre><span class="pl-k">ALTER</span> <span class="pl-k">TABLE</span> streaming_table COMPACT <span class="pl-s"><span class="pl-pds">'</span>close_streaming<span class="pl-pds">'</span></span>
</pre></div>
<h2>
<a id="constraint" class="anchor" href="#constraint" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Constraint</h2>
<ol>
<li>reject set streaming property from true to false.</li>
<li>reject UPDATE/DELETE command on the streaming table.</li>
<li>reject create MV on the streaming table.</li>
<li>reject add the streaming property on the table with MV.</li>
<li>if the table has dictionary columns, it will not support concurrent data loading.</li>
<li>block delete "streaming" segment while the streaming ingestion is running.</li>
<li>block drop the streaming table while the streaming ingestion is running.</li>
</ol>
<h2>
<a id="streamsql" class="anchor" href="#streamsql" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>StreamSQL</h2>
<h3>
<a id="streaming-table" class="anchor" href="#streaming-table" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Streaming Table</h3>
<p><strong>Example</strong></p>
<p>Following example shows how to start a streaming ingest job</p>
<pre><code> sql(
s"""
|CREATE TABLE source(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| tax DECIMAL(8,2),
| percent double,
| birthday DATE,
| register TIMESTAMP,
| updated TIMESTAMP
|)
|STORED AS carbondata
|TBLPROPERTIES (
| 'streaming'='source',
| 'format'='csv',
| 'path'='$csvDataDir'
|)
""".stripMargin)
sql(
s"""
|CREATE TABLE sink(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| tax DECIMAL(8,2),
| percent double,
| birthday DATE,
| register TIMESTAMP,
| updated TIMESTAMP
|)
|STORED AS carbondata
|TBLPROPERTIES (
| 'streaming'='true'
|)
""".stripMargin)
sql(
"""
|CREATE STREAM job123 ON TABLE sink
|STMPROPERTIES(
| 'trigger'='ProcessingTime',
| 'interval'='1 seconds')
|AS
| SELECT *
| FROM source
| WHERE id % 2 = 1
""".stripMargin)
sql("DROP STREAM job123")
sql("SHOW STREAMS [ON TABLE tableName]")
</code></pre>
<p>In above example, two table is created: source and sink. The <code>source</code> table's format is <code>csv</code> and <code>sink</code> table format is <code>carbon</code>. Then a streaming job is created to stream data from source table to sink table.</p>
<p>These two tables are normal carbon tables, they can be queried independently.</p>
<h3>
<a id="streaming-job-management" class="anchor" href="#streaming-job-management" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Streaming Job Management</h3>
<p>As above example shown:</p>
<ul>
<li>
<code>CREATE STREAM jobName ON TABLE tableName</code> is used to start a streaming ingest job.</li>
<li>
<code>DROP STREAM jobName</code> is used to stop a streaming job by its name</li>
<li>
<code>SHOW STREAMS [ON TABLE tableName]</code> is used to print streaming job information</li>
</ul>
<h5>
<a id="create-stream" class="anchor" href="#create-stream" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>CREATE STREAM</h5>
<p>When this is issued, carbon will start a structured streaming job to do the streaming ingestion. Before launching the job, system will validate:</p>
<ul>
<li>
<p>The format of table specified in CTAS FROM clause must be one of: csv, json, text, parquet, kafka, socket. These are formats supported by spark 2.2.0 structured streaming</p>
</li>
<li>
<p>User should pass the options of the streaming source table in its TBLPROPERTIES when creating it. StreamSQL will pass them transparently to spark when creating the streaming job. For example:</p>
<div class="highlight highlight-source-sql"><pre><span class="pl-k">CREATE</span> <span class="pl-k">TABLE</span> <span class="pl-en">source</span>(
name STRING,
age <span class="pl-k">INT</span>
)
STORED <span class="pl-k">AS</span> carbondata
TBLPROPERTIES(
<span class="pl-s"><span class="pl-pds">'</span>streaming<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>source<span class="pl-pds">'</span></span>,
<span class="pl-s"><span class="pl-pds">'</span>format<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>socket<span class="pl-pds">'</span></span>,
<span class="pl-s"><span class="pl-pds">'</span>host<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>localhost<span class="pl-pds">'</span></span>,
<span class="pl-s"><span class="pl-pds">'</span>port<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>8888<span class="pl-pds">'</span></span>,
<span class="pl-s"><span class="pl-pds">'</span>record_format<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>csv<span class="pl-pds">'</span></span>, <span class="pl-k">//</span> can be csv <span class="pl-k">or</span> json, default is csv
<span class="pl-s"><span class="pl-pds">'</span>delimiter<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>|<span class="pl-pds">'</span></span>
)</pre></div>
<p>will translate to</p>
<div class="highlight highlight-source-scala"><pre>spark.readStream
.schema(tableSchema)
.format(<span class="pl-s"><span class="pl-pds">"</span>socket<span class="pl-pds">"</span></span>)
.option(<span class="pl-s"><span class="pl-pds">"</span>host<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>localhost<span class="pl-pds">"</span></span>)
.option(<span class="pl-s"><span class="pl-pds">"</span>port<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>8888<span class="pl-pds">"</span></span>)
.option(<span class="pl-s"><span class="pl-pds">"</span>delimiter<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>|<span class="pl-pds">"</span></span>)</pre></div>
</li>
<li>
<p>The sink table should have a TBLPROPERTY <code>'streaming'</code> equal to <code>true</code>, indicating it is a streaming table.</p>
</li>
<li>
<p>In the given STMPROPERTIES, user must specify <code>'trigger'</code>, its value must be <code>ProcessingTime</code> (In future, other value will be supported). User should also specify interval value for the streaming job.</p>
</li>
<li>
<p>If the schema specified in sink table is different from CTAS, the streaming job will fail</p>
</li>
</ul>
<p>For Kafka data source, create the source table by:</p>
<div class="highlight highlight-source-sql"><pre><span class="pl-k">CREATE</span> <span class="pl-k">TABLE</span> <span class="pl-en">source</span>(
name STRING,
age <span class="pl-k">INT</span>
)
STORED <span class="pl-k">AS</span> carbondata
TBLPROPERTIES(
<span class="pl-s"><span class="pl-pds">'</span>streaming<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>source<span class="pl-pds">'</span></span>,
<span class="pl-s"><span class="pl-pds">'</span>format<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>kafka<span class="pl-pds">'</span></span>,
<span class="pl-s"><span class="pl-pds">'</span>kafka.bootstrap.servers<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>kafkaserver:9092<span class="pl-pds">'</span></span>,
<span class="pl-s"><span class="pl-pds">'</span>subscribe<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>test<span class="pl-pds">'</span></span>
<span class="pl-s"><span class="pl-pds">'</span>record_format<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>csv<span class="pl-pds">'</span></span>, <span class="pl-k">//</span> can be csv <span class="pl-k">or</span> json, default is csv
<span class="pl-s"><span class="pl-pds">'</span>delimiter<span class="pl-pds">'</span></span><span class="pl-k">=</span><span class="pl-s"><span class="pl-pds">'</span>|<span class="pl-pds">'</span></span>
)</pre></div>
<ul>
<li>Then CREATE STREAM can be used to start the streaming ingest job from source table to sink table</li>
</ul>
<pre><code>CREATE STREAM job123 ON TABLE sink
STMPROPERTIES(
'trigger'='ProcessingTime',
'interval'='10 seconds'
)
AS
SELECT *
FROM source
WHERE id % 2 = 1
</code></pre>
<h5>
<a id="drop-stream" class="anchor" href="#drop-stream" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>DROP STREAM</h5>
<p>When <code>DROP STREAM</code> is issued, the streaming job will be stopped immediately. It will fail if the jobName specified is not exist.</p>
<pre><code>DROP STREAM job123
</code></pre>
<h5>
<a id="show-streams" class="anchor" href="#show-streams" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>SHOW STREAMS</h5>
<p><code>SHOW STREAMS ON TABLE tableName</code> command will print the streaming job information as following</p>
<table>
<thead>
<tr>
<th>Job name</th>
<th>status</th>
<th>Source</th>
<th>Sink</th>
<th>start time</th>
<th>time elapsed</th>
</tr>
</thead>
<tbody>
<tr>
<td>job123</td>
<td>Started</td>
<td>device</td>
<td>fact</td>
<td>2018-02-03 14:32:42</td>
<td>10d2h32m</td>
</tr>
</tbody>
</table>
<p><code>SHOW STREAMS</code> command will show all stream jobs in the system.</p>
<h5>
<a id="alter-table-close-stream" class="anchor" href="#alter-table-close-stream" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>ALTER TABLE CLOSE STREAM</h5>
<p>When the streaming application is stopped, and user want to manually trigger data conversion from carbon streaming files to columnar files, one can use
<code>ALTER TABLE sink COMPACT 'CLOSE_STREAMING';</code></p>
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__docs').addClass('selected');
// Display docs subnav items
if (!$('.b-nav__docs').parent().hasClass('nav__item__with__subs--expanded')) {
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
}
});
</script></div>
</div>
</div>
</div>
<div class="doc-footer">
<a href="#top" class="scroll-top">Top</a>
</div>
</div>
</section>
</div>
</div>
</div>
</section><!-- End systemblock part -->
<script src="js/custom.js"></script>
</body>
</html>