blob: c02b679590db6ddbaf3e9d778167647e8bae7143 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link href='images/favicon.ico' rel='shortcut icon' type='image/x-icon'>
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>CarbonData</title>
<style>
</style>
<!-- Bootstrap -->
<link rel="stylesheet" href="css/bootstrap.min.css">
<link href="css/style.css" rel="stylesheet">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.scom/respond/1.4.2/respond.min.js"></script>
<![endif]-->
<script src="js/jquery.min.js"></script>
<script src="js/bootstrap.min.js"></script>
<script defer src="https://use.fontawesome.com/releases/v5.0.8/js/all.js"></script>
</head>
<body>
<header>
<nav class="navbar navbar-default navbar-custom cd-navbar-wrapper">
<div class="container">
<div class="navbar-header">
<button aria-controls="navbar" aria-expanded="false" data-target="#navbar" data-toggle="collapse"
class="navbar-toggle collapsed" type="button">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a href="index.html" class="logo">
<img src="images/CarbonDataLogo.png" alt="CarbonData logo" title="CarbocnData logo"/>
</a>
</div>
<div class="navbar-collapse collapse cd_navcontnt" id="navbar">
<ul class="nav navbar-nav navbar-right navlist-custom">
<li><a href="index.html" class="hidden-xs"><i class="fa fa-home" aria-hidden="true"></i> </a>
</li>
<li><a href="index.html" class="hidden-lg hidden-md hidden-sm">Home</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle " data-toggle="dropdown" role="button" aria-haspopup="true"
aria-expanded="false"> Download <span class="caret"></span></a>
<ul class="dropdown-menu">
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.2.0/"
target="_blank">Apache CarbonData 2.2.0</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.1.1/"
target="_blank">Apache CarbonData 2.1.1</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.1.0/"
target="_blank">Apache CarbonData 2.1.0</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.0.1/"
target="_blank">Apache CarbonData 2.0.1</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/2.0.0/"
target="_blank">Apache CarbonData 2.0.0</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.6.1/"
target="_blank">Apache CarbonData 1.6.1</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.6.0/"
target="_blank">Apache CarbonData 1.6.0</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.4/"
target="_blank">Apache CarbonData 1.5.4</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.3/"
target="_blank">Apache CarbonData 1.5.3</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.2/"
target="_blank">Apache CarbonData 1.5.2</a></li>
<li>
<a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.1/"
target="_blank">Apache CarbonData 1.5.1</a></li>
<li>
<a href="https://cwiki.apache.org/confluence/display/CARBONDATA/Releases"
target="_blank">Release Archive</a></li>
</ul>
</li>
<li><a href="documentation.html" class="active">Documentation</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true"
aria-expanded="false">Community <span class="caret"></span></a>
<ul class="dropdown-menu">
<li>
<a href="https://github.com/apache/carbondata/blob/master/docs/how-to-contribute-to-apache-carbondata.md"
target="_blank">Contributing to CarbonData</a></li>
<li>
<a href="https://github.com/apache/carbondata/blob/master/docs/release-guide.md"
target="_blank">Release Guide</a></li>
<li>
<a href="https://cwiki.apache.org/confluence/display/CARBONDATA/PMC+and+Committers+member+list"
target="_blank">Project PMC and Committers</a></li>
<li>
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66850609"
target="_blank">CarbonData Meetups</a></li>
<li><a href="security.html">Apache CarbonData Security</a></li>
<li><a href="https://issues.apache.org/jira/browse/CARBONDATA" target="_blank">Apache
Jira</a></li>
<li><a href="videogallery.html">CarbonData Videos </a></li>
</ul>
</li>
<li class="dropdown">
<a href="http://www.apache.org/" class="apache_link hidden-xs dropdown-toggle"
data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Apache</a>
<ul class="dropdown-menu">
<li><a href="http://www.apache.org/" target="_blank">Apache Homepage</a></li>
<li><a href="http://www.apache.org/licenses/" target="_blank">License</a></li>
<li><a href="http://www.apache.org/foundation/sponsorship.html"
target="_blank">Sponsorship</a></li>
<li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
</ul>
</li>
<li class="dropdown">
<a href="http://www.apache.org/" class="hidden-lg hidden-md hidden-sm dropdown-toggle"
data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Apache</a>
<ul class="dropdown-menu">
<li><a href="http://www.apache.org/" target="_blank">Apache Homepage</a></li>
<li><a href="http://www.apache.org/licenses/" target="_blank">License</a></li>
<li><a href="http://www.apache.org/foundation/sponsorship.html"
target="_blank">Sponsorship</a></li>
<li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
</ul>
</li>
<li>
<a href="#" id="search-icon"><i class="fa fa-search" aria-hidden="true"></i></a>
</li>
</ul>
</div><!--/.nav-collapse -->
<div id="search-box">
<form method="get" action="http://www.google.com/search" target="_blank">
<div class="search-block">
<table border="0" cellpadding="0" width="100%">
<tr>
<td style="width:80%">
<input type="text" name="q" size=" 5" maxlength="255" value=""
class="search-input" placeholder="Search...." required/>
</td>
<td style="width:20%">
<input type="submit" value="Search"/></td>
</tr>
<tr>
<td align="left" style="font-size:75%" colspan="2">
<input type="checkbox" name="sitesearch" value="carbondata.apache.org" checked/>
<span style=" position: relative; top: -3px;"> Only search for CarbonData</span>
</td>
</tr>
</table>
</div>
</form>
</div>
</div>
</nav>
</header> <!-- end Header part -->
<div class="fixed-padding"></div> <!-- top padding with fixde header -->
<section><!-- Dashboard nav -->
<div class="container-fluid q">
<div class="col-sm-12 col-md-12 maindashboard">
<div class="verticalnavbar">
<nav class="b-sticky-nav">
<div class="nav-scroller">
<div class="nav__inner">
<a class="b-nav__intro nav__item" href="./introduction.html">introduction</a>
<a class="b-nav__quickstart nav__item" href="./quick-start-guide.html">quick start</a>
<a class="b-nav__uses nav__item" href="./usecases.html">use cases</a>
<div class="nav__item nav__item__with__subs">
<a class="b-nav__docs nav__item nav__sub__anchor" href="./language-manual.html">Language Reference</a>
<a class="nav__item nav__sub__item" href="./ddl-of-carbondata.html">DDL</a>
<a class="nav__item nav__sub__item" href="./dml-of-carbondata.html">DML</a>
<a class="nav__item nav__sub__item" href="./streaming-guide.html">Streaming</a>
<a class="nav__item nav__sub__item" href="./configuration-parameters.html">Configuration</a>
<a class="nav__item nav__sub__item" href="./index-developer-guide.html">Indexes</a>
<a class="nav__item nav__sub__item" href="./supported-data-types-in-carbondata.html">Data Types</a>
</div>
<div class="nav__item nav__item__with__subs">
<a class="b-nav__datamap nav__item nav__sub__anchor" href="./index-management.html">Index Managament</a>
<a class="nav__item nav__sub__item" href="./bloomfilter-index-guide.html">Bloom Filter</a>
<a class="nav__item nav__sub__item" href="./lucene-index-guide.html">Lucene</a>
<a class="nav__item nav__sub__item" href="./secondary-index-guide.html">Secondary Index</a>
<a class="nav__item nav__sub__item" href="../spatial-index-guide.html">Spatial Index</a>
<a class="nav__item nav__sub__item" href="../mv-guide.html">MV</a>
</div>
<div class="nav__item nav__item__with__subs">
<a class="b-nav__api nav__item nav__sub__anchor" href="./sdk-guide.html">API</a>
<a class="nav__item nav__sub__item" href="./sdk-guide.html">Java SDK</a>
<a class="nav__item nav__sub__item" href="./csdk-guide.html">C++ SDK</a>
</div>
<a class="b-nav__perf nav__item" href="./performance-tuning.html">Performance Tuning</a>
<a class="b-nav__s3 nav__item" href="./s3-guide.html">S3 Storage</a>
<a class="b-nav__indexserver nav__item" href="./index-server.html">Index Server</a>
<a class="b-nav__prestodb nav__item" href="./prestodb-guide.html">PrestoDB Integration</a>
<a class="b-nav__prestosql nav__item" href="./prestosql-guide.html">PrestoSQL Integration</a>
<a class="b-nav__flink nav__item" href="./flink-integration-guide.html">Flink Integration</a>
<a class="b-nav__scd nav__item" href="./scd-and-cdc-guide.html">SCD & CDC</a>
<a class="b-nav__faq nav__item" href="./faq.html">FAQ</a>
<a class="b-nav__contri nav__item" href="./how-to-contribute-to-apache-carbondata.html">Contribute</a>
<a class="b-nav__security nav__item" href="./security.html">Security</a>
<a class="b-nav__release nav__item" href="./release-guide.html">Release Guide</a>
</div>
</div>
<div class="navindicator">
<div class="b-nav__intro navindicator__item"></div>
<div class="b-nav__quickstart navindicator__item"></div>
<div class="b-nav__uses navindicator__item"></div>
<div class="b-nav__docs navindicator__item"></div>
<div class="b-nav__datamap navindicator__item"></div>
<div class="b-nav__api navindicator__item"></div>
<div class="b-nav__perf navindicator__item"></div>
<div class="b-nav__s3 navindicator__item"></div>
<div class="b-nav__indexserver navindicator__item"></div>
<div class="b-nav__prestodb navindicator__item"></div>
<div class="b-nav__prestosql navindicator__item"></div>
<div class="b-nav__flink navindicator__item"></div>
<div class="b-nav__scd navindicator__item"></div>
<div class="b-nav__faq navindicator__item"></div>
<div class="b-nav__contri navindicator__item"></div>
<div class="b-nav__security navindicator__item"></div>
</div>
</nav>
</div>
<div class="mdcontent">
<section>
<div style="padding:10px 15px;">
<div id="viewpage" name="viewpage">
<div class="row">
<div class="col-sm-12 col-md-12">
<div>
<h1>
<a id="carbon-flink-integration-guide" class="anchor" href="#carbon-flink-integration-guide" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Carbon Flink Integration Guide</h1>
<h2>
<a id="usage-scenarios" class="anchor" href="#usage-scenarios" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Usage scenarios</h2>
<p>The CarbonData flink integration module is used to connect Flink and Carbon. The module provides
a set of Flink BulkWriter implementations (CarbonLocalWriter and CarbonS3Writer). The data is processed
by the Flink, and finally written into the stage directory of the target table by the CarbonXXXWriter.</p>
<p>By default, those data in table stage directory, can not be immediately queried, those data can be queried
after the <code>INSERT INTO $tableName STAGE</code> command is executed.</p>
<p>Since the flink data written to carbon is endless, in order to ensure the visibility of data
and the controllable amount of data processed during the execution of each insert form stage command,
the user should execute the insert from stage command in a timely manner.</p>
<p>The execution interval of the insert form stage command should take the data visibility requirements
of the actual business and the flink data traffic. When the data visibility requirements are high
or the data traffic is large, the execution interval should be appropriately shortened.</p>
<p>A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
for subsequent analysis and queries.</p>
<h2>
<a id="usage-description" class="anchor" href="#usage-description" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Usage description</h2>
<h3>
<a id="writing-process" class="anchor" href="#writing-process" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Writing process</h3>
<p>Typical flink stream: <code>Source -&gt; Process -&gt; Output(Carbon Writer Sink)</code></p>
<p>Pseudo code and description:</p>
<div class="highlight highlight-source-scala"><pre> <span class="pl-c"><span class="pl-c">//</span> Import dependencies.</span>
<span class="pl-k">import</span> <span class="pl-en">java</span>.<span class="pl-en">util</span>.<span class="pl-en">Properties</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">CarbonWriterFactory</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">ProxyFileSystem</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbondata</span>.<span class="pl-en">core</span>.<span class="pl-en">constants</span>.<span class="pl-en">CarbonCommonConstants</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">api</span>.<span class="pl-en">common</span>.<span class="pl-en">restartstrategy</span>.<span class="pl-en">RestartStrategies</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">core</span>.<span class="pl-en">fs</span>.<span class="pl-en">Path</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">environment</span>.<span class="pl-en">StreamExecutionEnvironment</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">functions</span>.<span class="pl-en">sink</span>.<span class="pl-en">filesystem</span>.<span class="pl-en">StreamingFileSink</span>
<span class="pl-c"><span class="pl-c">//</span> Specify database name.</span>
<span class="pl-k">val</span> <span class="pl-smi">databaseName</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>default<span class="pl-pds">"</span></span>
<span class="pl-c"><span class="pl-c">//</span> Specify target table name.</span>
<span class="pl-k">val</span> <span class="pl-smi">tableName</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>test<span class="pl-pds">"</span></span>
<span class="pl-c"><span class="pl-c">//</span> Table path of the target table.</span>
<span class="pl-k">val</span> <span class="pl-smi">tablePath</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>/data/warehouse/test<span class="pl-pds">"</span></span>
<span class="pl-c"><span class="pl-c">//</span> Specify local temporary path.</span>
<span class="pl-k">val</span> <span class="pl-smi">dataTempPath</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>/data/temp/<span class="pl-pds">"</span></span>
<span class="pl-k">val</span> <span class="pl-smi">tableProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span>
<span class="pl-c"><span class="pl-c">//</span> Set the table properties here.</span>
<span class="pl-k">val</span> <span class="pl-smi">writerProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span>
<span class="pl-c"><span class="pl-c">//</span> Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.</span>
<span class="pl-k">val</span> <span class="pl-smi">carbonProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span>
<span class="pl-c"><span class="pl-c">//</span> Set the carbon properties here, such as date format, store location, etc.</span>
<span class="pl-c"><span class="pl-c">//</span> Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.</span>
<span class="pl-k">val</span> <span class="pl-smi">writerFactory</span> <span class="pl-k">=</span> <span class="pl-en">CarbonWriterFactory</span>.builder(<span class="pl-s"><span class="pl-pds">"</span>Local<span class="pl-pds">"</span></span>).build(
databaseName,
tableName,
tablePath,
tableProperties,
writerProperties,
carbonProperties
)
<span class="pl-c"><span class="pl-c">//</span> Build a flink stream and run it.</span>
<span class="pl-c"><span class="pl-c">//</span> 1. Create a new flink execution environment.</span>
<span class="pl-k">val</span> <span class="pl-smi">environment</span> <span class="pl-k">=</span> <span class="pl-en">StreamExecutionEnvironment</span>.getExecutionEnvironment
<span class="pl-c"><span class="pl-c">//</span> Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.</span>
<span class="pl-c"><span class="pl-c">//</span> 2. Create flink data source, may be a kafka source, custom source, or others.</span>
<span class="pl-c"><span class="pl-c">//</span> The data type of source should be Array[AnyRef].</span>
<span class="pl-c"><span class="pl-c">//</span> Array length should equals to table column count, and values order in array should matches table column order.</span>
<span class="pl-k">val</span> <span class="pl-smi">source</span> <span class="pl-k">=</span> ...
<span class="pl-c"><span class="pl-c">//</span> 3. Create flink stream and set source.</span>
<span class="pl-k">val</span> <span class="pl-smi">stream</span> <span class="pl-k">=</span> environment.addSource(source)
<span class="pl-c"><span class="pl-c">//</span> 4. Add other flink operators here.</span>
<span class="pl-c"><span class="pl-c">//</span> ...</span>
<span class="pl-c"><span class="pl-c">//</span> 5. Set flink stream target (write data to carbon with a write sink).</span>
stream.addSink(<span class="pl-en">StreamingFileSink</span>.forBulkFormat(<span class="pl-k">new</span> <span class="pl-en">Path</span>(<span class="pl-en">ProxyFileSystem</span>.<span class="pl-en">DEFAULT_URI</span>), writerFactory).build)
<span class="pl-c"><span class="pl-c">//</span> 6. Run flink stream.</span>
<span class="pl-k">try</span> {
environment.execute
} <span class="pl-k">catch</span> {
<span class="pl-k">case</span> <span class="pl-v">exception</span>: <span class="pl-en">Exception</span> <span class="pl-k">=&gt;</span>
<span class="pl-c"><span class="pl-c">//</span> Handle execute exception here.</span>
}</pre></div>
<h3>
<a id="writer-properties" class="anchor" href="#writer-properties" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Writer properties</h3>
<h4>
<a id="local-writer" class="anchor" href="#local-writer" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Local Writer</h4>
<table>
<thead>
<tr>
<th>Property</th>
<th>Name</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>CarbonLocalProperty.DATA_TEMP_PATH</td>
<td>carbon.writer.local.data.temp.path</td>
<td>Usually is a local path, data will write to temp path first, and move to target data path finally.</td>
</tr>
<tr>
<td>CarbonLocalProperty.COMMIT_THRESHOLD</td>
<td>carbon.writer.local.commit.threshold</td>
<td>While written data count reach the threshold, data writer will flush and move data to target data path.</td>
</tr>
</tbody>
</table>
<h4>
<a id="s3-writer" class="anchor" href="#s3-writer" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>S3 Writer</h4>
<table>
<thead>
<tr>
<th>Property</th>
<th>Name</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>CarbonS3Property.ACCESS_KEY</td>
<td>carbon.writer.s3.access.key</td>
<td>Access key of s3 file system</td>
</tr>
<tr>
<td>CarbonS3Property.SECRET_KEY</td>
<td>carbon.writer.s3.secret.key</td>
<td>Secret key of s3 file system</td>
</tr>
<tr>
<td>CarbonS3Property.ENDPOINT</td>
<td>carbon.writer.s3.endpoint</td>
<td>Endpoint of s3 file system</td>
</tr>
<tr>
<td>CarbonS3Property.DATA_TEMP_PATH</td>
<td>carbon.writer.s3.data.temp.path</td>
<td>Usually is a local path, data will write to temp path first, and move to target data path finally.</td>
</tr>
<tr>
<td>CarbonS3Property.COMMIT_THRESHOLD</td>
<td>carbon.writer.s3.commit.threshold</td>
<td>While written data count reach the threshold, data writer will flush and move data to target data path.</td>
</tr>
</tbody>
</table>
<h3>
<a id="insert-from-stage" class="anchor" href="#insert-from-stage" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Insert from stage</h3>
<p>Refer <a href="./dml-of-carbondata.html#insert-data-into-carbondata-table-from-stage-input-files">Grammar Description</a> for syntax.</p>
<h2>
<a id="usage-example-code" class="anchor" href="#usage-example-code" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Usage Example Code</h2>
<p>Create target table.</p>
<div class="highlight highlight-source-sql"><pre> <span class="pl-k">CREATE</span> <span class="pl-k">TABLE</span> <span class="pl-en">test</span> (col1 string, col2 string, col3 string) STORED <span class="pl-k">AS</span> carbondata</pre></div>
<p>Writing flink data to local carbon table.</p>
<div class="highlight highlight-source-scala"><pre> <span class="pl-k">import</span> <span class="pl-en">java</span>.<span class="pl-en">util</span>.<span class="pl-en">Properties</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">CarbonLocalProperty</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">CarbonWriterFactory</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">ProxyFileSystem</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbondata</span>.<span class="pl-en">core</span>.<span class="pl-en">constants</span>.<span class="pl-en">CarbonCommonConstants</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">api</span>.<span class="pl-en">common</span>.<span class="pl-en">restartstrategy</span>.<span class="pl-en">RestartStrategies</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">core</span>.<span class="pl-en">fs</span>.<span class="pl-en">Path</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">environment</span>.<span class="pl-en">StreamExecutionEnvironment</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">functions</span>.<span class="pl-en">sink</span>.<span class="pl-en">filesystem</span>.<span class="pl-en">StreamingFileSink</span>
<span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">functions</span>.<span class="pl-en">source</span>.<span class="pl-en">SourceFunction</span>
<span class="pl-k">val</span> <span class="pl-smi">databaseName</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>default<span class="pl-pds">"</span></span>
<span class="pl-k">val</span> <span class="pl-smi">tableName</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>test<span class="pl-pds">"</span></span>
<span class="pl-k">val</span> <span class="pl-smi">tablePath</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>/data/warehouse/test<span class="pl-pds">"</span></span>
<span class="pl-k">val</span> <span class="pl-smi">dataTempPath</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>/data/temp/<span class="pl-pds">"</span></span>
<span class="pl-k">val</span> <span class="pl-smi">tableProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span>
<span class="pl-k">val</span> <span class="pl-smi">writerProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span>
writerProperties.setProperty(<span class="pl-en">CarbonLocalProperty</span>.<span class="pl-en">DATA_TEMP_PATH</span>, dataTempPath)
<span class="pl-k">val</span> <span class="pl-smi">carbonProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span>
carbonProperties.setProperty(<span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">CARBON_TIMESTAMP_FORMAT</span>, <span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">CARBON_TIMESTAMP_DEFAULT_FORMAT</span>)
carbonProperties.setProperty(<span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">CARBON_DATE_FORMAT</span>, <span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">CARBON_DATE_DEFAULT_FORMAT</span>)
carbonProperties.setProperty(<span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">UNSAFE_WORKING_MEMORY_IN_MB</span>, <span class="pl-s"><span class="pl-pds">"</span>1024<span class="pl-pds">"</span></span>)
<span class="pl-k">val</span> <span class="pl-smi">writerFactory</span> <span class="pl-k">=</span> <span class="pl-en">CarbonWriterFactory</span>.builder(<span class="pl-s"><span class="pl-pds">"</span>Local<span class="pl-pds">"</span></span>).build(
databaseName,
tableName,
tablePath,
tableProperties,
writerProperties,
carbonProperties
)
<span class="pl-k">val</span> <span class="pl-smi">environment</span> <span class="pl-k">=</span> <span class="pl-en">StreamExecutionEnvironment</span>.getExecutionEnvironment
environment.setParallelism(<span class="pl-c1">1</span>)
environment.enableCheckpointing(<span class="pl-c1">2000L</span>)
environment.setRestartStrategy(<span class="pl-en">RestartStrategies</span>.noRestart)
<span class="pl-c"><span class="pl-c">//</span> Define a custom source.</span>
<span class="pl-k">val</span> <span class="pl-smi">source</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">SourceFunction</span>[<span class="pl-en">Array</span>[<span class="pl-en">AnyRef</span>]]() {
<span class="pl-k">override</span>
<span class="pl-k">def</span> <span class="pl-en">run</span>(<span class="pl-v">sourceContext</span>: <span class="pl-en">SourceFunction</span>.<span class="pl-en">SourceContext</span>[<span class="pl-en">Array</span>[<span class="pl-en">AnyRef</span>]])<span class="pl-k">:</span> <span class="pl-en">Unit</span> <span class="pl-k">=</span> {
<span class="pl-c"><span class="pl-c">//</span> Array length should equals to table column count, and values order in array should matches table column order.</span>
<span class="pl-k">val</span> <span class="pl-smi">data</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Array</span>[<span class="pl-en">AnyRef</span>](<span class="pl-c1">3</span>)
data(<span class="pl-c1">0</span>) <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>value1<span class="pl-pds">"</span></span>
data(<span class="pl-c1">1</span>) <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>value2<span class="pl-pds">"</span></span>
data(<span class="pl-c1">2</span>) <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>value3<span class="pl-pds">"</span></span>
sourceContext.collect(data)
}
<span class="pl-k">override</span>
<span class="pl-k">def</span> <span class="pl-en">cancel</span>()<span class="pl-k">:</span> <span class="pl-en">Unit</span> <span class="pl-k">=</span> {
<span class="pl-c"><span class="pl-c">//</span> do something.</span>
}
}
<span class="pl-k">val</span> <span class="pl-smi">stream</span> <span class="pl-k">=</span> environment.addSource(source)
<span class="pl-k">val</span> <span class="pl-smi">streamSink</span> <span class="pl-k">=</span> <span class="pl-en">StreamingFileSink</span>.forBulkFormat(<span class="pl-k">new</span> <span class="pl-en">Path</span>(<span class="pl-en">ProxyFileSystem</span>.<span class="pl-en">DEFAULT_URI</span>), writerFactory).build
stream.addSink(streamSink)
<span class="pl-k">try</span> {
environment.execute
} <span class="pl-k">catch</span> {
<span class="pl-k">case</span> <span class="pl-v">exception</span>: <span class="pl-en">Exception</span> <span class="pl-k">=&gt;</span>
<span class="pl-c"><span class="pl-c">//</span> TODO</span>
<span class="pl-k">throw</span> <span class="pl-k">new</span> <span class="pl-en">UnsupportedOperationException</span>(exception)
}</pre></div>
<p>Insert into table from stage directory.</p>
<div class="highlight highlight-source-sql"><pre> <span class="pl-k">INSERT INTO</span> test STAGE</pre></div>
<script>
// Show selected style on nav item
$(function() { $('.b-nav__flink').addClass('selected'); });
</script></div>
</div>
</div>
</div>
<div class="doc-footer">
<a href="#top" class="scroll-top">Top</a>
</div>
</div>
</section>
</div>
</div>
</div>
</section><!-- End systemblock part -->
<script src="js/custom.js"></script>
</body>
</html>