| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <link href='images/favicon.ico' rel='shortcut icon' type='image/x-icon'> |
| <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> |
| <title>CarbonData</title> |
| <style> |
| |
| </style> |
| <!-- Bootstrap --> |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <link href="css/style.css" rel="stylesheet"> |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.scom/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| <script src="js/jquery.min.js"></script> |
| <script src="js/bootstrap.min.js"></script> |
| <script defer src="https://use.fontawesome.com/releases/v5.0.8/js/all.js"></script> |
| |
| |
| </head> |
| <body> |
| <header> |
| <nav class="navbar navbar-default navbar-custom cd-navbar-wrapper"> |
| <div class="container"> |
| <div class="navbar-header"> |
| <button aria-controls="navbar" aria-expanded="false" data-target="#navbar" data-toggle="collapse" |
| class="navbar-toggle collapsed" type="button"> |
| <span class="sr-only">Toggle navigation</span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <a href="index.html" class="logo"> |
| <img src="images/CarbonDataLogo.png" alt="CarbonData logo" title="CarbocnData logo"/> |
| </a> |
| </div> |
| <div class="navbar-collapse collapse cd_navcontnt" id="navbar"> |
| <ul class="nav navbar-nav navbar-right navlist-custom"> |
| <li><a href="index.html" class="hidden-xs"><i class="fa fa-home" aria-hidden="true"></i> </a> |
| </li> |
| <li><a href="index.html" class="hidden-lg hidden-md hidden-sm">Home</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle " data-toggle="dropdown" role="button" aria-haspopup="true" |
| aria-expanded="false"> Download <span class="caret"></span></a> |
| <ul class="dropdown-menu"> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/2.2.0/" |
| target="_blank">Apache CarbonData 2.2.0</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/2.1.1/" |
| target="_blank">Apache CarbonData 2.1.1</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/2.1.0/" |
| target="_blank">Apache CarbonData 2.1.0</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/2.0.1/" |
| target="_blank">Apache CarbonData 2.0.1</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/2.0.0/" |
| target="_blank">Apache CarbonData 2.0.0</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/1.6.1/" |
| target="_blank">Apache CarbonData 1.6.1</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/1.6.0/" |
| target="_blank">Apache CarbonData 1.6.0</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.4/" |
| target="_blank">Apache CarbonData 1.5.4</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.3/" |
| target="_blank">Apache CarbonData 1.5.3</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.2/" |
| target="_blank">Apache CarbonData 1.5.2</a></li> |
| <li> |
| <a href="https://dist.apache.org/repos/dist/release/carbondata/1.5.1/" |
| target="_blank">Apache CarbonData 1.5.1</a></li> |
| <li> |
| <a href="https://cwiki.apache.org/confluence/display/CARBONDATA/Releases" |
| target="_blank">Release Archive</a></li> |
| </ul> |
| </li> |
| <li><a href="documentation.html" class="active">Documentation</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" |
| aria-expanded="false">Community <span class="caret"></span></a> |
| <ul class="dropdown-menu"> |
| <li> |
| <a href="https://github.com/apache/carbondata/blob/master/docs/how-to-contribute-to-apache-carbondata.md" |
| target="_blank">Contributing to CarbonData</a></li> |
| <li> |
| <a href="https://github.com/apache/carbondata/blob/master/docs/release-guide.md" |
| target="_blank">Release Guide</a></li> |
| <li> |
| <a href="https://cwiki.apache.org/confluence/display/CARBONDATA/PMC+and+Committers+member+list" |
| target="_blank">Project PMC and Committers</a></li> |
| <li> |
| <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66850609" |
| target="_blank">CarbonData Meetups</a></li> |
| <li><a href="security.html">Apache CarbonData Security</a></li> |
| <li><a href="https://issues.apache.org/jira/browse/CARBONDATA" target="_blank">Apache |
| Jira</a></li> |
| <li><a href="videogallery.html">CarbonData Videos </a></li> |
| </ul> |
| </li> |
| <li class="dropdown"> |
| <a href="http://www.apache.org/" class="apache_link hidden-xs dropdown-toggle" |
| data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Apache</a> |
| <ul class="dropdown-menu"> |
| <li><a href="http://www.apache.org/" target="_blank">Apache Homepage</a></li> |
| <li><a href="http://www.apache.org/licenses/" target="_blank">License</a></li> |
| <li><a href="http://www.apache.org/foundation/sponsorship.html" |
| target="_blank">Sponsorship</a></li> |
| <li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="http://www.apache.org/" class="hidden-lg hidden-md hidden-sm dropdown-toggle" |
| data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Apache</a> |
| <ul class="dropdown-menu"> |
| <li><a href="http://www.apache.org/" target="_blank">Apache Homepage</a></li> |
| <li><a href="http://www.apache.org/licenses/" target="_blank">License</a></li> |
| <li><a href="http://www.apache.org/foundation/sponsorship.html" |
| target="_blank">Sponsorship</a></li> |
| <li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li> |
| </ul> |
| </li> |
| |
| <li> |
| <a href="#" id="search-icon"><i class="fa fa-search" aria-hidden="true"></i></a> |
| |
| </li> |
| |
| </ul> |
| </div><!--/.nav-collapse --> |
| <div id="search-box"> |
| <form method="get" action="http://www.google.com/search" target="_blank"> |
| <div class="search-block"> |
| <table border="0" cellpadding="0" width="100%"> |
| <tr> |
| <td style="width:80%"> |
| <input type="text" name="q" size=" 5" maxlength="255" value="" |
| class="search-input" placeholder="Search...." required/> |
| </td> |
| <td style="width:20%"> |
| <input type="submit" value="Search"/></td> |
| </tr> |
| <tr> |
| <td align="left" style="font-size:75%" colspan="2"> |
| <input type="checkbox" name="sitesearch" value="carbondata.apache.org" checked/> |
| <span style=" position: relative; top: -3px;"> Only search for CarbonData</span> |
| </td> |
| </tr> |
| </table> |
| </div> |
| </form> |
| </div> |
| </div> |
| </nav> |
| </header> <!-- end Header part --> |
| |
| <div class="fixed-padding"></div> <!-- top padding with fixde header --> |
| |
| <section><!-- Dashboard nav --> |
| <div class="container-fluid q"> |
| <div class="col-sm-12 col-md-12 maindashboard"> |
| <div class="verticalnavbar"> |
| <nav class="b-sticky-nav"> |
| <div class="nav-scroller"> |
| <div class="nav__inner"> |
| <a class="b-nav__intro nav__item" href="./introduction.html">introduction</a> |
| <a class="b-nav__quickstart nav__item" href="./quick-start-guide.html">quick start</a> |
| <a class="b-nav__uses nav__item" href="./usecases.html">use cases</a> |
| |
| <div class="nav__item nav__item__with__subs"> |
| <a class="b-nav__docs nav__item nav__sub__anchor" href="./language-manual.html">Language Reference</a> |
| <a class="nav__item nav__sub__item" href="./ddl-of-carbondata.html">DDL</a> |
| <a class="nav__item nav__sub__item" href="./dml-of-carbondata.html">DML</a> |
| <a class="nav__item nav__sub__item" href="./streaming-guide.html">Streaming</a> |
| <a class="nav__item nav__sub__item" href="./configuration-parameters.html">Configuration</a> |
| <a class="nav__item nav__sub__item" href="./index-developer-guide.html">Indexes</a> |
| <a class="nav__item nav__sub__item" href="./supported-data-types-in-carbondata.html">Data Types</a> |
| </div> |
| |
| <div class="nav__item nav__item__with__subs"> |
| <a class="b-nav__datamap nav__item nav__sub__anchor" href="./index-management.html">Index Managament</a> |
| <a class="nav__item nav__sub__item" href="./bloomfilter-index-guide.html">Bloom Filter</a> |
| <a class="nav__item nav__sub__item" href="./lucene-index-guide.html">Lucene</a> |
| <a class="nav__item nav__sub__item" href="./secondary-index-guide.html">Secondary Index</a> |
| <a class="nav__item nav__sub__item" href="../spatial-index-guide.html">Spatial Index</a> |
| <a class="nav__item nav__sub__item" href="../mv-guide.html">MV</a> |
| </div> |
| |
| <div class="nav__item nav__item__with__subs"> |
| <a class="b-nav__api nav__item nav__sub__anchor" href="./sdk-guide.html">API</a> |
| <a class="nav__item nav__sub__item" href="./sdk-guide.html">Java SDK</a> |
| <a class="nav__item nav__sub__item" href="./csdk-guide.html">C++ SDK</a> |
| </div> |
| |
| <a class="b-nav__perf nav__item" href="./performance-tuning.html">Performance Tuning</a> |
| <a class="b-nav__s3 nav__item" href="./s3-guide.html">S3 Storage</a> |
| <a class="b-nav__indexserver nav__item" href="./index-server.html">Index Server</a> |
| <a class="b-nav__prestodb nav__item" href="./prestodb-guide.html">PrestoDB Integration</a> |
| <a class="b-nav__prestosql nav__item" href="./prestosql-guide.html">PrestoSQL Integration</a> |
| <a class="b-nav__flink nav__item" href="./flink-integration-guide.html">Flink Integration</a> |
| <a class="b-nav__scd nav__item" href="./scd-and-cdc-guide.html">SCD & CDC</a> |
| <a class="b-nav__faq nav__item" href="./faq.html">FAQ</a> |
| <a class="b-nav__contri nav__item" href="./how-to-contribute-to-apache-carbondata.html">Contribute</a> |
| <a class="b-nav__security nav__item" href="./security.html">Security</a> |
| <a class="b-nav__release nav__item" href="./release-guide.html">Release Guide</a> |
| </div> |
| </div> |
| <div class="navindicator"> |
| <div class="b-nav__intro navindicator__item"></div> |
| <div class="b-nav__quickstart navindicator__item"></div> |
| <div class="b-nav__uses navindicator__item"></div> |
| <div class="b-nav__docs navindicator__item"></div> |
| <div class="b-nav__datamap navindicator__item"></div> |
| <div class="b-nav__api navindicator__item"></div> |
| <div class="b-nav__perf navindicator__item"></div> |
| <div class="b-nav__s3 navindicator__item"></div> |
| <div class="b-nav__indexserver navindicator__item"></div> |
| <div class="b-nav__prestodb navindicator__item"></div> |
| <div class="b-nav__prestosql navindicator__item"></div> |
| <div class="b-nav__flink navindicator__item"></div> |
| <div class="b-nav__scd navindicator__item"></div> |
| <div class="b-nav__faq navindicator__item"></div> |
| <div class="b-nav__contri navindicator__item"></div> |
| <div class="b-nav__security navindicator__item"></div> |
| </div> |
| </nav> |
| </div> |
| <div class="mdcontent"> |
| <section> |
| <div style="padding:10px 15px;"> |
| <div id="viewpage" name="viewpage"> |
| <div class="row"> |
| <div class="col-sm-12 col-md-12"> |
| <div> |
| <h1> |
| <a id="carbon-flink-integration-guide" class="anchor" href="#carbon-flink-integration-guide" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Carbon Flink Integration Guide</h1> |
| <h2> |
| <a id="usage-scenarios" class="anchor" href="#usage-scenarios" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Usage scenarios</h2> |
| <p>The CarbonData flink integration module is used to connect Flink and Carbon. The module provides |
| a set of Flink BulkWriter implementations (CarbonLocalWriter and CarbonS3Writer). The data is processed |
| by the Flink, and finally written into the stage directory of the target table by the CarbonXXXWriter.</p> |
| <p>By default, those data in table stage directory, can not be immediately queried, those data can be queried |
| after the <code>INSERT INTO $tableName STAGE</code> command is executed.</p> |
| <p>Since the flink data written to carbon is endless, in order to ensure the visibility of data |
| and the controllable amount of data processed during the execution of each insert form stage command, |
| the user should execute the insert from stage command in a timely manner.</p> |
| <p>The execution interval of the insert form stage command should take the data visibility requirements |
| of the actual business and the flink data traffic. When the data visibility requirements are high |
| or the data traffic is large, the execution interval should be appropriately shortened.</p> |
| <p>A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon, |
| for subsequent analysis and queries.</p> |
| <h2> |
| <a id="usage-description" class="anchor" href="#usage-description" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Usage description</h2> |
| <h3> |
| <a id="writing-process" class="anchor" href="#writing-process" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Writing process</h3> |
| <p>Typical flink stream: <code>Source -> Process -> Output(Carbon Writer Sink)</code></p> |
| <p>Pseudo code and description:</p> |
| <div class="highlight highlight-source-scala"><pre> <span class="pl-c"><span class="pl-c">//</span> Import dependencies.</span> |
| <span class="pl-k">import</span> <span class="pl-en">java</span>.<span class="pl-en">util</span>.<span class="pl-en">Properties</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">CarbonWriterFactory</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">ProxyFileSystem</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbondata</span>.<span class="pl-en">core</span>.<span class="pl-en">constants</span>.<span class="pl-en">CarbonCommonConstants</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">api</span>.<span class="pl-en">common</span>.<span class="pl-en">restartstrategy</span>.<span class="pl-en">RestartStrategies</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">core</span>.<span class="pl-en">fs</span>.<span class="pl-en">Path</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">environment</span>.<span class="pl-en">StreamExecutionEnvironment</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">functions</span>.<span class="pl-en">sink</span>.<span class="pl-en">filesystem</span>.<span class="pl-en">StreamingFileSink</span> |
| |
| <span class="pl-c"><span class="pl-c">//</span> Specify database name.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">databaseName</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>default<span class="pl-pds">"</span></span> |
| |
| <span class="pl-c"><span class="pl-c">//</span> Specify target table name.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">tableName</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>test<span class="pl-pds">"</span></span> |
| <span class="pl-c"><span class="pl-c">//</span> Table path of the target table.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">tablePath</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>/data/warehouse/test<span class="pl-pds">"</span></span> |
| <span class="pl-c"><span class="pl-c">//</span> Specify local temporary path.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">dataTempPath</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>/data/temp/<span class="pl-pds">"</span></span> |
| |
| <span class="pl-k">val</span> <span class="pl-smi">tableProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span> |
| <span class="pl-c"><span class="pl-c">//</span> Set the table properties here.</span> |
| |
| <span class="pl-k">val</span> <span class="pl-smi">writerProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span> |
| <span class="pl-c"><span class="pl-c">//</span> Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.</span> |
| |
| <span class="pl-k">val</span> <span class="pl-smi">carbonProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span> |
| <span class="pl-c"><span class="pl-c">//</span> Set the carbon properties here, such as date format, store location, etc.</span> |
| |
| <span class="pl-c"><span class="pl-c">//</span> Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">writerFactory</span> <span class="pl-k">=</span> <span class="pl-en">CarbonWriterFactory</span>.builder(<span class="pl-s"><span class="pl-pds">"</span>Local<span class="pl-pds">"</span></span>).build( |
| databaseName, |
| tableName, |
| tablePath, |
| tableProperties, |
| writerProperties, |
| carbonProperties |
| ) |
| |
| <span class="pl-c"><span class="pl-c">//</span> Build a flink stream and run it.</span> |
| <span class="pl-c"><span class="pl-c">//</span> 1. Create a new flink execution environment.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">environment</span> <span class="pl-k">=</span> <span class="pl-en">StreamExecutionEnvironment</span>.getExecutionEnvironment |
| <span class="pl-c"><span class="pl-c">//</span> Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.</span> |
| |
| <span class="pl-c"><span class="pl-c">//</span> 2. Create flink data source, may be a kafka source, custom source, or others.</span> |
| <span class="pl-c"><span class="pl-c">//</span> The data type of source should be Array[AnyRef].</span> |
| <span class="pl-c"><span class="pl-c">//</span> Array length should equals to table column count, and values order in array should matches table column order.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">source</span> <span class="pl-k">=</span> ... |
| <span class="pl-c"><span class="pl-c">//</span> 3. Create flink stream and set source.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">stream</span> <span class="pl-k">=</span> environment.addSource(source) |
| <span class="pl-c"><span class="pl-c">//</span> 4. Add other flink operators here.</span> |
| <span class="pl-c"><span class="pl-c">//</span> ...</span> |
| <span class="pl-c"><span class="pl-c">//</span> 5. Set flink stream target (write data to carbon with a write sink).</span> |
| stream.addSink(<span class="pl-en">StreamingFileSink</span>.forBulkFormat(<span class="pl-k">new</span> <span class="pl-en">Path</span>(<span class="pl-en">ProxyFileSystem</span>.<span class="pl-en">DEFAULT_URI</span>), writerFactory).build) |
| <span class="pl-c"><span class="pl-c">//</span> 6. Run flink stream.</span> |
| <span class="pl-k">try</span> { |
| environment.execute |
| } <span class="pl-k">catch</span> { |
| <span class="pl-k">case</span> <span class="pl-v">exception</span>: <span class="pl-en">Exception</span> <span class="pl-k">=></span> |
| <span class="pl-c"><span class="pl-c">//</span> Handle execute exception here.</span> |
| }</pre></div> |
| <h3> |
| <a id="writer-properties" class="anchor" href="#writer-properties" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Writer properties</h3> |
| <h4> |
| <a id="local-writer" class="anchor" href="#local-writer" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Local Writer</h4> |
| <table> |
| <thead> |
| <tr> |
| <th>Property</th> |
| <th>Name</th> |
| <th>Description</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td>CarbonLocalProperty.DATA_TEMP_PATH</td> |
| <td>carbon.writer.local.data.temp.path</td> |
| <td>Usually is a local path, data will write to temp path first, and move to target data path finally.</td> |
| </tr> |
| <tr> |
| <td>CarbonLocalProperty.COMMIT_THRESHOLD</td> |
| <td>carbon.writer.local.commit.threshold</td> |
| <td>While written data count reach the threshold, data writer will flush and move data to target data path.</td> |
| </tr> |
| </tbody> |
| </table> |
| <h4> |
| <a id="s3-writer" class="anchor" href="#s3-writer" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>S3 Writer</h4> |
| <table> |
| <thead> |
| <tr> |
| <th>Property</th> |
| <th>Name</th> |
| <th>Description</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td>CarbonS3Property.ACCESS_KEY</td> |
| <td>carbon.writer.s3.access.key</td> |
| <td>Access key of s3 file system</td> |
| </tr> |
| <tr> |
| <td>CarbonS3Property.SECRET_KEY</td> |
| <td>carbon.writer.s3.secret.key</td> |
| <td>Secret key of s3 file system</td> |
| </tr> |
| <tr> |
| <td>CarbonS3Property.ENDPOINT</td> |
| <td>carbon.writer.s3.endpoint</td> |
| <td>Endpoint of s3 file system</td> |
| </tr> |
| <tr> |
| <td>CarbonS3Property.DATA_TEMP_PATH</td> |
| <td>carbon.writer.s3.data.temp.path</td> |
| <td>Usually is a local path, data will write to temp path first, and move to target data path finally.</td> |
| </tr> |
| <tr> |
| <td>CarbonS3Property.COMMIT_THRESHOLD</td> |
| <td>carbon.writer.s3.commit.threshold</td> |
| <td>While written data count reach the threshold, data writer will flush and move data to target data path.</td> |
| </tr> |
| </tbody> |
| </table> |
| <h3> |
| <a id="insert-from-stage" class="anchor" href="#insert-from-stage" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Insert from stage</h3> |
| <p>Refer <a href="./dml-of-carbondata.html#insert-data-into-carbondata-table-from-stage-input-files">Grammar Description</a> for syntax.</p> |
| <h2> |
| <a id="usage-example-code" class="anchor" href="#usage-example-code" aria-hidden="true"><span aria-hidden="true" class="octicon octicon-link"></span></a>Usage Example Code</h2> |
| <p>Create target table.</p> |
| <div class="highlight highlight-source-sql"><pre> <span class="pl-k">CREATE</span> <span class="pl-k">TABLE</span> <span class="pl-en">test</span> (col1 string, col2 string, col3 string) STORED <span class="pl-k">AS</span> carbondata</pre></div> |
| <p>Writing flink data to local carbon table.</p> |
| <div class="highlight highlight-source-scala"><pre> <span class="pl-k">import</span> <span class="pl-en">java</span>.<span class="pl-en">util</span>.<span class="pl-en">Properties</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">CarbonLocalProperty</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">CarbonWriterFactory</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbon</span>.<span class="pl-en">flink</span>.<span class="pl-en">ProxyFileSystem</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">carbondata</span>.<span class="pl-en">core</span>.<span class="pl-en">constants</span>.<span class="pl-en">CarbonCommonConstants</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">api</span>.<span class="pl-en">common</span>.<span class="pl-en">restartstrategy</span>.<span class="pl-en">RestartStrategies</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">core</span>.<span class="pl-en">fs</span>.<span class="pl-en">Path</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">environment</span>.<span class="pl-en">StreamExecutionEnvironment</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">functions</span>.<span class="pl-en">sink</span>.<span class="pl-en">filesystem</span>.<span class="pl-en">StreamingFileSink</span> |
| <span class="pl-k">import</span> <span class="pl-en">org</span>.<span class="pl-en">apache</span>.<span class="pl-en">flink</span>.<span class="pl-en">streaming</span>.<span class="pl-en">api</span>.<span class="pl-en">functions</span>.<span class="pl-en">source</span>.<span class="pl-en">SourceFunction</span> |
| |
| <span class="pl-k">val</span> <span class="pl-smi">databaseName</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>default<span class="pl-pds">"</span></span> |
| <span class="pl-k">val</span> <span class="pl-smi">tableName</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>test<span class="pl-pds">"</span></span> |
| <span class="pl-k">val</span> <span class="pl-smi">tablePath</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>/data/warehouse/test<span class="pl-pds">"</span></span> |
| <span class="pl-k">val</span> <span class="pl-smi">dataTempPath</span> <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>/data/temp/<span class="pl-pds">"</span></span> |
| |
| <span class="pl-k">val</span> <span class="pl-smi">tableProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span> |
| |
| <span class="pl-k">val</span> <span class="pl-smi">writerProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span> |
| writerProperties.setProperty(<span class="pl-en">CarbonLocalProperty</span>.<span class="pl-en">DATA_TEMP_PATH</span>, dataTempPath) |
| |
| <span class="pl-k">val</span> <span class="pl-smi">carbonProperties</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Properties</span> |
| carbonProperties.setProperty(<span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">CARBON_TIMESTAMP_FORMAT</span>, <span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">CARBON_TIMESTAMP_DEFAULT_FORMAT</span>) |
| carbonProperties.setProperty(<span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">CARBON_DATE_FORMAT</span>, <span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">CARBON_DATE_DEFAULT_FORMAT</span>) |
| carbonProperties.setProperty(<span class="pl-en">CarbonCommonConstants</span>.<span class="pl-en">UNSAFE_WORKING_MEMORY_IN_MB</span>, <span class="pl-s"><span class="pl-pds">"</span>1024<span class="pl-pds">"</span></span>) |
| |
| <span class="pl-k">val</span> <span class="pl-smi">writerFactory</span> <span class="pl-k">=</span> <span class="pl-en">CarbonWriterFactory</span>.builder(<span class="pl-s"><span class="pl-pds">"</span>Local<span class="pl-pds">"</span></span>).build( |
| databaseName, |
| tableName, |
| tablePath, |
| tableProperties, |
| writerProperties, |
| carbonProperties |
| ) |
| |
| <span class="pl-k">val</span> <span class="pl-smi">environment</span> <span class="pl-k">=</span> <span class="pl-en">StreamExecutionEnvironment</span>.getExecutionEnvironment |
| environment.setParallelism(<span class="pl-c1">1</span>) |
| environment.enableCheckpointing(<span class="pl-c1">2000L</span>) |
| environment.setRestartStrategy(<span class="pl-en">RestartStrategies</span>.noRestart) |
| |
| <span class="pl-c"><span class="pl-c">//</span> Define a custom source.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">source</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">SourceFunction</span>[<span class="pl-en">Array</span>[<span class="pl-en">AnyRef</span>]]() { |
| <span class="pl-k">override</span> |
| <span class="pl-k">def</span> <span class="pl-en">run</span>(<span class="pl-v">sourceContext</span>: <span class="pl-en">SourceFunction</span>.<span class="pl-en">SourceContext</span>[<span class="pl-en">Array</span>[<span class="pl-en">AnyRef</span>]])<span class="pl-k">:</span> <span class="pl-en">Unit</span> <span class="pl-k">=</span> { |
| <span class="pl-c"><span class="pl-c">//</span> Array length should equals to table column count, and values order in array should matches table column order.</span> |
| <span class="pl-k">val</span> <span class="pl-smi">data</span> <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-en">Array</span>[<span class="pl-en">AnyRef</span>](<span class="pl-c1">3</span>) |
| data(<span class="pl-c1">0</span>) <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>value1<span class="pl-pds">"</span></span> |
| data(<span class="pl-c1">1</span>) <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>value2<span class="pl-pds">"</span></span> |
| data(<span class="pl-c1">2</span>) <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">"</span>value3<span class="pl-pds">"</span></span> |
| sourceContext.collect(data) |
| } |
| |
| <span class="pl-k">override</span> |
| <span class="pl-k">def</span> <span class="pl-en">cancel</span>()<span class="pl-k">:</span> <span class="pl-en">Unit</span> <span class="pl-k">=</span> { |
| <span class="pl-c"><span class="pl-c">//</span> do something.</span> |
| } |
| } |
| |
| <span class="pl-k">val</span> <span class="pl-smi">stream</span> <span class="pl-k">=</span> environment.addSource(source) |
| <span class="pl-k">val</span> <span class="pl-smi">streamSink</span> <span class="pl-k">=</span> <span class="pl-en">StreamingFileSink</span>.forBulkFormat(<span class="pl-k">new</span> <span class="pl-en">Path</span>(<span class="pl-en">ProxyFileSystem</span>.<span class="pl-en">DEFAULT_URI</span>), writerFactory).build |
| |
| stream.addSink(streamSink) |
| |
| <span class="pl-k">try</span> { |
| environment.execute |
| } <span class="pl-k">catch</span> { |
| <span class="pl-k">case</span> <span class="pl-v">exception</span>: <span class="pl-en">Exception</span> <span class="pl-k">=></span> |
| <span class="pl-c"><span class="pl-c">//</span> TODO</span> |
| <span class="pl-k">throw</span> <span class="pl-k">new</span> <span class="pl-en">UnsupportedOperationException</span>(exception) |
| }</pre></div> |
| <p>Insert into table from stage directory.</p> |
| <div class="highlight highlight-source-sql"><pre> <span class="pl-k">INSERT INTO</span> test STAGE</pre></div> |
| <script> |
| // Show selected style on nav item |
| $(function() { $('.b-nav__flink').addClass('selected'); }); |
| </script></div> |
| </div> |
| </div> |
| </div> |
| <div class="doc-footer"> |
| <a href="#top" class="scroll-top">Top</a> |
| </div> |
| </div> |
| </section> |
| </div> |
| </div> |
| </div> |
| </section><!-- End systemblock part --> |
| <script src="js/custom.js"></script> |
| </body> |
| </html> |