blob: ba2ca82fc56a8adf7ce08c7782cc419691437a4f [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Gearpump Connectors - Apache Gearpump(incubating)</title>
<link rel="shortcut icon" href="../../img/favicon.ico">
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="../../css/highlight.css">
<script>
// Current page data
var mkdocs_page_name = "Gearpump Connectors";
</script>
<script src="../../js/jquery-2.1.1.min.js"></script>
<script src="../../js/modernizr-2.8.3.min.js"></script>
<script type="text/javascript" src="../../js/highlight.pack.js"></script>
<script src="../../js/theme.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../../index.html" class="icon icon-home"> Apache Gearpump(incubating)</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li>
<li class="toctree-l1 ">
<a class="" href="../../index.html">Overview</a>
</li>
<li>
<li>
<ul class="subnav">
<li><span>Introduction</span></li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/submit-your-1st-application/index.html">Submit Your 1st Application</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/commandline/index.html">Client Command Line</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/basic-concepts/index.html">Basic Concepts</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/features/index.html">Technical Highlights</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/message-delivery/index.html">Reliable Message Delivery</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/performance-report/index.html">Performance</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../introduction/gearpump-internals/index.html">Gearpump Internals</a>
</li>
</ul>
<li>
<li>
<ul class="subnav">
<li><span>Deployment</span></li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-local/index.html">Local Mode</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-standalone/index.html">Standalone Mode</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-yarn/index.html">YARN Mode</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-docker/index.html">Docker Mode</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-ui-authentication/index.html">UI Authentication</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-ha/index.html">High Availability</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-msg-delivery/index.html">Reliable Message Delivery</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-configuration/index.html">Configuration</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-resource-isolation/index.html">Resource Isolation</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/deployment-security/index.html">YARN Security Guide</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/get-gearpump-distribution/index.html">How to Get Your Gearpump Distribution</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../deployment/hardware-requirement/index.html">Hardware Requirement</a>
</li>
</ul>
<li>
<li>
<ul class="subnav">
<li><span>Programming Guide</span></li>
<li class="toctree-l1 ">
<a class="" href="../dev-write-1st-app/index.html">Write Your 1st App</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-custom-serializer/index.html">Customized Message Passing</a>
</li>
<li class="toctree-l1 current">
<a class="current" href="index.html">Gearpump Connectors</a>
<ul>
<li class="toctree-l3"><a href="#basic-concepts">Basic Concepts</a></li>
<li><a class="toctree-l4" href="#datasource">DataSource</a></li>
<li><a class="toctree-l4" href="#datasink">DataSink</a></li>
<li class="toctree-l3"><a href="#implemented-connectors">Implemented Connectors</a></li>
<li><a class="toctree-l4" href="#datasource-implemented">DataSource implemented</a></li>
<li><a class="toctree-l4" href="#datasink-implemented">DataSink implemented</a></li>
<li class="toctree-l3"><a href="#use-of-connectors">Use of Connectors</a></li>
<li><a class="toctree-l4" href="#use-of-kafka-connectors">Use of Kafka connectors</a></li>
<li><a class="toctree-l4" href="#use-of-hbasesink">Use of HBaseSink</a></li>
<li class="toctree-l3"><a href="#how-to-implement-your-own-datasource">How to implement your own DataSource</a></li>
<li><a class="toctree-l4" href="#implement-your-own-datasource">Implement your own DataSource</a></li>
<li><a class="toctree-l4" href="#implement-dsl-helper-optional">Implement DSL helper (Optional)</a></li>
<li class="toctree-l3"><a href="#how-to-implement-your-own-datasink">How to implement your own DataSink</a></li>
<li><a class="toctree-l4" href="#implement-your-own-datasink">Implement your own DataSink</a></li>
<li><a class="toctree-l4" href="#implement-dsl-helper-optional_1">Implement DSL helper (Optional)</a></li>
</ul>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-storm/index.html">Storm Compatibility</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-ide-setup/index.html">IDE Setup</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-non-streaming-example/index.html">Non Streaming Examples</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../dev-rest-api/index.html">REST API</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../api/scala/index.html">Scala API</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../../api/java/index.html">Java API</a>
</li>
</ul>
<li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../index.html">Apache Gearpump(incubating)</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../index.html">Docs</a> &raquo;</li>
<li>Programming Guide &raquo;</li>
<li>Gearpump Connectors</li>
<li class="wy-breadcrumbs-aside">
<a href="https://github.com/apache/incubator-gearpump" class="icon icon-github"> Edit on GitHub</a>
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h2 id="basic-concepts">Basic Concepts</h2>
<p><code>DataSource</code> and <code>DataSink</code> are the two main concepts Gearpump use to connect with the outside world.</p>
<h3 id="datasource">DataSource</h3>
<p><code>DataSource</code> is the start point of a streaming processing flow. </p>
<h3 id="datasink">DataSink</h3>
<p><code>DataSink</code> is the end point of a streaming processing flow.</p>
<h2 id="implemented-connectors">Implemented Connectors</h2>
<h3 id="datasource-implemented"><code>DataSource</code> implemented</h3>
<p>Currently, we have following <code>DataSource</code> supported.</p>
<table>
<thead>
<tr>
<th>Name</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>CollectionDataSource</code></td>
<td>Convert a collection to a recursive data source. E.g. <code>seq(1, 2, 3)</code> will output <code>1,2,3,1,2,3...</code>.</td>
</tr>
<tr>
<td><code>KafkaSource</code></td>
<td>Read from Kafka.</td>
</tr>
</tbody>
</table>
<h3 id="datasink-implemented"><code>DataSink</code> implemented</h3>
<p>Currently, we have following <code>DataSink</code> supported.</p>
<table>
<thead>
<tr>
<th>Name</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>HBaseSink</code></td>
<td>Write the message to HBase. The message to write must be HBase <code>Put</code> or a tuple of <code>(rowKey, family, column, value)</code>.</td>
</tr>
<tr>
<td><code>KafkaSink</code></td>
<td>Write to Kafka.</td>
</tr>
</tbody>
</table>
<h2 id="use-of-connectors">Use of Connectors</h2>
<h3 id="use-of-kafka-connectors">Use of Kafka connectors</h3>
<p>To use Kafka connectors in your application, you first need to add the <code>gearpump-external-kafka</code> library dependency in your application:</p>
<h4 id="sbt">SBT</h4>
<pre class="codehilite"><code class="language-sbt">&quot;org.apache.gearpump&quot; %% &quot;gearpump-external-kafka&quot; % 0.8.3</code></pre>
<h4 id="xml">XML</h4>
<pre class="codehilite"><code class="language-xml">&lt;dependency&gt;
&lt;groupId&gt;org.apache.gearpump&lt;/groupId&gt;
&lt;artifactId&gt;gearpump-external-kafka&lt;/artifactId&gt;
&lt;version&gt;0.8.3&lt;/version&gt;
&lt;/dependency&gt;</code></pre>
<p>This is a simple example to read from Kafka and write it back using <code>KafkaSource</code> and <code>KafkaSink</code>. Users can optionally set a <code>CheckpointStoreFactory</code> such that Kafka offsets are checkpointed and at-least-once message delivery is guaranteed. </p>
<h4 id="low-level-api">Low level API</h4>
<pre class="codehilite"><code class="language-scala">val appConfig = UserConfig.empty
val props = new Properties
props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
val source = new KafkaSource(sourceTopic, props)
val checkpointStoreFactory = new KafkaStoreFactory(props)
source.setCheckpointStore(checkpointStoreFactory)
val sourceProcessor = DataSourceProcessor(source, sourceNum)
val sink = new KafkaSink(sinkTopic, props)
val sinkProcessor = DataSinkProcessor(sink, sinkNum)
val partitioner = new ShufflePartitioner
val computation = sourceProcessor ~ partitioner ~&gt; sinkProcessor
val app = StreamApplication(appName, Graph(computation), appConfig)</code></pre>
<h4 id="high-level-api">High level API</h4>
<pre class="codehilite"><code class="language-scala">val props = new Properties
val appName = &quot;KafkaDSL&quot;
props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
val app = StreamApp(appName, context)
if (atLeastOnce) {
val checkpointStoreFactory = new KafkaStoreFactory(props)
KafkaDSL.createAtLeastOnceStream(app, sourceTopic, checkpointStoreFactory, props, sourceNum)
.writeToKafka(sinkTopic, props, sinkNum)
} else {
KafkaDSL.createAtMostOnceStream(app, sourceTopic, props, sourceNum)
.writeToKafka(sinkTopic, props, sinkNum)
}</code></pre>
<p>In the above example, configurations are set through Java properties and shared by <code>KafkaSource</code>, <code>KafkaSink</code> and <code>KafkaCheckpointStoreFactory</code>.
Their configurations can be defined differently as below. </p>
<h4 id="kafkasource-configurations"><code>KafkaSource</code> configurations</h4>
<table>
<thead>
<tr>
<th>Name</th>
<th>Descriptions</th>
<th>Type</th>
<th>Default</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>KafkaConfig.ZOOKEEPER_CONNECT_CONFIG</code></td>
<td>Zookeeper connect string for Kafka topics management</td>
<td>String</td>
<td></td>
</tr>
<tr>
<td><code>KafkaConfig.CLIENT_ID_CONFIG</code></td>
<td>An id string to pass to the server when making requests</td>
<td>String</td>
<td>""</td>
</tr>
<tr>
<td><code>KafkaConfig.GROUP_ID_CONFIG</code></td>
<td>A string that uniquely identifies a set of consumers within the same consumer group</td>
<td>""</td>
<td></td>
</tr>
<tr>
<td><code>KafkaConfig.FETCH_SLEEP_MS_CONFIG</code></td>
<td>The amount of time(ms) to sleep when hitting fetch.threshold</td>
<td>Int</td>
<td>100</td>
</tr>
<tr>
<td><code>KafkaConfig.FETCH_THRESHOLD_CONFIG</code></td>
<td>Size of internal queue to keep Kafka messages. Stop fetching and go to sleep when hitting the threshold</td>
<td>Int</td>
<td>10000</td>
</tr>
<tr>
<td><code>KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG</code></td>
<td>Partition grouper class to group partitions among source tasks</td>
<td>Class</td>
<td>DefaultPartitionGrouper</td>
</tr>
<tr>
<td><code>KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG</code></td>
<td>Message decoder class to decode raw bytes from Kafka</td>
<td>Class</td>
<td>DefaultMessageDecoder</td>
</tr>
<tr>
<td><code>KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG</code></td>
<td>Timestamp filter class to filter out late messages</td>
<td>Class</td>
<td>DefaultTimeStampFilter</td>
</tr>
</tbody>
</table>
<h4 id="kafkasink-configurations"><code>KafkaSink</code> configurations</h4>
<table>
<thead>
<tr>
<th>Name</th>
<th>Descriptions</th>
<th>Type</th>
<th>Default</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>KafkaConfig.BOOTSTRAP_SERVERS_CONFIG</code></td>
<td>A list of host/port pairs to use for establishing the initial connection to the Kafka cluster</td>
<td>String</td>
<td></td>
</tr>
<tr>
<td><code>KafkaConfig.CLIENT_ID_CONFIG</code></td>
<td>An id string to pass to the server when making requests</td>
<td>String</td>
<td>""</td>
</tr>
</tbody>
</table>
<h4 id="kafkacheckpointstorefactory-configurations"><code>KafkaCheckpointStoreFactory</code> configurations</h4>
<table>
<thead>
<tr>
<th>Name</th>
<th>Descriptions</th>
<th>Type</th>
<th>Default</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>KafkaConfig.ZOOKEEPER_CONNECT_CONFIG</code></td>
<td>Zookeeper connect string for Kafka topics management</td>
<td>String</td>
<td></td>
</tr>
<tr>
<td><code>KafkaConfig.BOOTSTRAP_SERVERS_CONFIG</code></td>
<td>A list of host/port pairs to use for establishing the initial connection to the Kafka cluster</td>
<td>String</td>
<td></td>
</tr>
<tr>
<td><code>KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX</code></td>
<td>Name prefix for checkpoint store</td>
<td>String</td>
<td>""</td>
</tr>
<tr>
<td><code>KafkaConfig.REPLICATION_FACTOR</code></td>
<td>Replication factor for checkpoint store topic</td>
<td>Int</td>
<td>1</td>
</tr>
</tbody>
</table>
<h3 id="use-of-hbasesink">Use of <code>HBaseSink</code></h3>
<p>To use <code>HBaseSink</code> in your application, you first need to add the <code>gearpump-external-hbase</code> library dependency in your application:</p>
<h4 id="sbt_1">SBT</h4>
<pre class="codehilite"><code class="language-sbt">&quot;org.apache.gearpump&quot; %% &quot;gearpump-external-hbase&quot; % 0.8.3</code></pre>
<h4 id="xml_1">XML</h4>
<pre class="codehilite"><code class="language-xml">&lt;dependency&gt;
&lt;groupId&gt;org.apache.gearpump&lt;/groupId&gt;
&lt;artifactId&gt;gearpump-external-hbase&lt;/artifactId&gt;
&lt;version&gt;0.8.3&lt;/version&gt;
&lt;/dependency&gt;</code></pre>
<p>To connect to HBase, you need to provide following info:</p>
<ul>
<li>the HBase configuration to tell which HBase service to connect</li>
<li>the table name (you must create the table yourself, see the <a href="https://hbase.apache.org/book.html">HBase documentation</a>)</li>
</ul>
<p>Then, you can use <code>HBaseSink</code> in your application:</p>
<pre class="codehilite"><code class="language-scala">//create the HBase data sink
val sink = HBaseSink(UserConfig.empty, tableName, HBaseConfiguration.create())
//create Gearpump Processor
val sinkProcessor = DataSinkProcessor(sink, parallelism)
:::scala
//assume stream is a normal `Stream` in DSL
stream.writeToHbase(UserConfig.empty, tableName, parallelism, &quot;write to HBase&quot;)</code></pre>
<p>You can tune the connection to HBase via the HBase configuration passed in. If not passed, Gearpump will try to check local classpath to find a valid HBase configuration (<code>hbase-site.xml</code>).</p>
<p>Attention, due to the issue discussed <a href="http://stackoverflow.com/questions/24456484/hbase-managed-zookeeper-suddenly-trying-to-connect-to-localhost-instead-of-zooke">here</a> you may need to create additional configuration for your HBase sink:</p>
<pre class="codehilite"><code class="language-scala">def hadoopConfig = {
val conf = new Configuration()
conf.set(&quot;hbase.zookeeper.quorum&quot;, &quot;zookeeperHost&quot;)
conf.set(&quot;hbase.zookeeper.property.clientPort&quot;, &quot;2181&quot;)
conf
}
val sink = HBaseSink(UserConfig.empty, tableName, hadoopConfig)</code></pre>
<h2 id="how-to-implement-your-own-datasource">How to implement your own <code>DataSource</code></h2>
<p>To implement your own <code>DataSource</code>, you need to implement two things:</p>
<ol>
<li>The data source itself</li>
<li>a helper class to easy the usage in a DSL</li>
</ol>
<h3 id="implement-your-own-datasource">Implement your own <code>DataSource</code></h3>
<p>You need to implement a class derived from <code>org.apache.gearpump.streaming.transaction.api.TimeReplayableSource</code>.</p>
<h3 id="implement-dsl-helper-optional">Implement DSL helper (Optional)</h3>
<p>If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper.
You can refer <code>KafkaDSLUtil</code> as an example in Gearpump source.</p>
<p>Below is some code snippet from <code>KafkaDSLUtil</code>:</p>
<pre class="codehilite"><code class="language-scala">object KafkaDSLUtil {
def createStream[T](
app: StreamApp,
topics: String,
parallelism: Int,
description: String,
properties: Properties): dsl.Stream[T] = {
app.source[T](new KafkaSource(topics, properties), parallelism, description)
}
}</code></pre>
<h2 id="how-to-implement-your-own-datasink">How to implement your own <code>DataSink</code></h2>
<p>To implement your own <code>DataSink</code>, you need to implement two things:</p>
<ol>
<li>The data sink itself</li>
<li>a helper class to make it easy use in DSL</li>
</ol>
<h3 id="implement-your-own-datasink">Implement your own <code>DataSink</code></h3>
<p>You need to implement a class derived from <code>org.apache.gearpump.streaming.sink.DataSink</code>.</p>
<h3 id="implement-dsl-helper-optional_1">Implement DSL helper (Optional)</h3>
<p>If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper.
You can refer <code>HBaseDSLSink</code> as an example in Gearpump source.</p>
<p>Below is some code snippet from <code>HBaseDSLSink</code>:</p>
<pre class="codehilite"><code class="language-scala">class HBaseDSLSink[T](stream: Stream[T]) {
def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String): Stream[T] = {
stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, description)
}
def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, parallism: Int, description: String): Stream[T] = {
stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, userConfig, description)
}
}
object HBaseDSLSink {
implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = {
new HBaseDSLSink[T](stream)
}
}</code></pre>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../dev-storm/index.html" class="btn btn-neutral float-right" title="Storm Compatibility"/>Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../dev-custom-serializer/index.html" class="btn btn-neutral" title="Customized Message Passing"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<a class="icon icon-github" style="float: left; color: #fcfcfc"> GitHub</a>
<span><a href="../dev-custom-serializer/index.html" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../dev-storm/index.html" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
</body>
</html>