blob: 760bd6ad519196365d41abac5a26fe857cfcc26a [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=1"/>
<title>Gearpump Connectors - GearPump 0.6.2 Documentation</title>
<link rel="stylesheet" href="css/bootstrap-3.3.5.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<link rel="stylesheet" href="css/main.css">
<link rel="stylesheet" href="css/pygments-default.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<div class="navbar navbar-inverse navbar-fixed-top" id="topbar">
<div class="container">
<div class="navbar-header">
<a class="navbar-brand" href="/">GearPump
<span class="label label-primary" style="font-size: .6em">0.6.2</span>
</a>
</div>
<div class="collapse navbar-collapse">
<ul class="nav navbar-nav">
<li><a href="index.html">Overview</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Introduction<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="submit-your-1st-application.html">Submit Your 1st Application</a></li>
<li><a href="commandline.html">Client Command Line</a></li>
<li class="divider"></li>
<li><a href="basic-concepts.html">Basic Concepts</a></li>
<li><a href="features.html">Technical Highlights</a></li>
<li><a href="message-delivery.html">Reliable Message Delivery</a></li>
<li><a href="performance-report.html">Performance</a></li>
<li><a href="gearpump-internals.html">Gearpump Internals</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
<ul class="dropdown-menu">
<li class="dropdown-header">Deployment</li>
<li><a href="deployment-docker.html">Docker</a><li>
<li><a href="deployment-local.html">Local</a><li>
<li><a href="deployment-standalone.html">Standalone</a></li>
<li><a href="deployment-yarn.html">YARN</a></li>
<li class="divider"></li>
<li><a href="deployment-ha.html">High Availability</a></li>
<li><a href="deployment-msg-delivery.html">Reliable Message Delivery</a></li>
<li><a href="deployment-configuration.html">Configuration</a></li>
<li class="divider"></li>
<li><a href="deployment-security.html">Security</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guide<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="dev-write-1st-app.html">Write Your 1st App</a></li>
<li><a href="dev-custom-serializer.html">Customized Message Passing</a></li>
<li class="divider"></li>
<li><a href="api/scala/index.html">Scala API</a></li>
<li><a href="api/java/index.html">Java API</a></li>
<li><a href="dev-rest-api.html">RESTful API</a></li>
<li class="divider"></li>
<li><a href="dev-connectors.html">Gearpump Connectors</a></li>
<li class="divider"></li>
<li><a href="dev-storm.html">Storm Compatibility</a></li>
<!--
<li><a href="dev-samoa.html">Samoa Compatibility</a></li>
<li class="divider"></li>
<li><a href="dev-iot.html">Gearpump with IoT</a></li>
-->
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="how-to-contribute.html">How to Contribute</a></li>
<li><a href="coding-style.html">Coding Style</a></li>
<li class="divider"></li>
<li><a href="faq.html">FAQ</a><li>
<li><a href="about.html">About</a></li>
</ul>
</li>
</ul>
</div>
</div>
</div>
<div class="container" id="content">
<h1 class="title">Gearpump Connectors</h1>
<h2 id="basic-concepts">Basic Concepts</h2>
<p><code>DataSource</code> and <code>DataSink</code> are the two main concepts Gearpump use to connect with the outside world.</p>
<h3 id="datasource">DataSource</h3>
<p><code>DataSource</code> is the concept in Gearpump that without input and will output messages. So, basically, <code>DataSource</code> is the start point of a streaming processing flow.</p>
<p>As Gearpump depends on <code>DataSource</code> to be replayable to ensure at-least-once message delivery and exactly-once message delivery, for some data sources, we will need a <code>io.gearpump.streaming.transaction.api.OffsetStorageFactory</code> to store the offset (progress) of current <code>DataSource</code>. So that, when a replay is needed, Gearpump can guide <code>DataSource</code> to replay from certain offset.</p>
<p>Currently Gearpump <code>DataSource</code> only support infinite stream. Finite stream support will be added in a near future release.</p>
<h3 id="datasink">DataSink</h3>
<p><code>DataSink</code> is the concept that without output but will consume messages. So, <code>Sink</code> is the end point of a streaming processing flow.</p>
<h2 id="implemented-connectors">Implemented Connectors</h2>
<h3 id="datasource-implemented"><code>DataSource</code> implemented</h3>
<p>Currently, we have following <code>DataSource</code> supported.</p>
<table>
<thead>
<tr>
<th>Name</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>CollectionDataSource</code></td>
<td>Convert a collection to a recursive data source. E.g. <code>seq(1, 2, 3)</code> will output <code>1,2,3,1,2,3...</code>.</td>
</tr>
<tr>
<td><code>KafkaSource</code></td>
<td>Read from Kafka.</td>
</tr>
</tbody>
</table>
<h3 id="datasink-implemented"><code>DataSink</code> implemented</h3>
<p>Currently, we have following <code>DataSink</code> supported.</p>
<table>
<thead>
<tr>
<th>Name</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>HBaseSink</code></td>
<td>Write the message to HBase. The message to write must be HBase <code>Put</code> or a tuple of <code>(rowKey, family, column, value)</code>.</td>
</tr>
<tr>
<td><code>KafkaSink</code></td>
<td>Write to Kafka.</td>
</tr>
</tbody>
</table>
<h2 id="use-of-connectors">Use of Connectors</h2>
<p>### Use of <code>KafkaSource</code>
To use <code>kafkaSource</code> in your application, you first need to add the <code>gearpump-external-Kafka</code> library dependency in your application:</p>
<div class="codetabs">
<div data-lang="SBT">
<div class="highlight"><pre><code>"com.github.intel-hadoop" %% "gearpump-external-kafka" % 0.6.2
</code></pre></div>
</div>
<div data-lang="Maven">
<div class="highlight"><pre><code>&lt;dependency&gt;
&lt;groupId&gt;com.github.intel-hadoop&lt;/groupId&gt;
&lt;artifactId&gt;gearpump-external-kafka&lt;/artifactId&gt;
&lt;version&gt;0.6.2&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div>
</div>
</div>
<p>To connect to Kafka, you need to provide following info:
- the Zookeeper address
- the Kafka topic</p>
<p>Then, you can use <code>KafkaSource</code> in your application:</p>
<div class="codetabs">
<div data-lang="Low Level API">
<div class="highlight"><pre><code class="language-scala"> <span class="c1">//Specify the offset storage.</span>
<span class="c1">//Here we use the same zookeeper as the offset storage.</span>
<span class="c1">//A set of corresponding topics will be created to store the offsets.</span>
<span class="c1">//You are free to specify your own offset storage</span>
<span class="k">val</span> <span class="n">offsetStorageFactory</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">KafkaStorageFactory</span><span class="o">(</span><span class="n">zookeepers</span><span class="o">,</span> <span class="n">brokers</span><span class="o">)</span>
<span class="c1">//create the kafka data source</span>
<span class="k">val</span> <span class="n">source</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">KafkaSource</span><span class="o">(</span><span class="n">topic</span><span class="o">,</span> <span class="n">zookeepers</span><span class="o">,</span> <span class="n">offsetStorageFactory</span><span class="o">)</span>
<span class="c1">//create Gearpump Processor</span>
<span class="k">val</span> <span class="n">reader</span> <span class="k">=</span> <span class="nc">DataSourceProcessor</span><span class="o">(</span><span class="n">source</span><span class="o">,</span> <span class="n">parallelism</span><span class="o">)</span></code></pre></div>
</div>
<div data-lang="High Level DSL">
<div class="highlight"><pre><code class="language-scala"> <span class="c1">//specify the offset storage</span>
<span class="c1">//here we use the same zookeeper as the offset storage (a set of corresponding topics will be created to store the offsets)</span>
<span class="c1">//you are free to specify your own offset storage</span>
<span class="k">val</span> <span class="n">offsetStorageFactory</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">KafkaStorageFactory</span><span class="o">(</span><span class="n">zookeepers</span><span class="o">,</span> <span class="n">brokers</span><span class="o">)</span>
<span class="k">val</span> <span class="n">source</span> <span class="k">=</span> <span class="nc">KafkaDSLUtil</span><span class="o">.</span><span class="n">createStream</span><span class="o">(</span><span class="n">app</span><span class="o">,</span> <span class="n">parallelism</span><span class="o">,</span> <span class="s">&quot;Kafka Source&quot;</span><span class="o">,</span> <span class="n">topics</span><span class="o">,</span> <span class="n">zookeepers</span><span class="o">,</span> <span class="n">offsetStorageFactory</span><span class="o">)</span>
<span class="o">...</span></code></pre></div>
</div>
</div>
<h3 id="use-of-hbasesink">Use of <code>HBaseSink</code></h3>
<p>To use <code>HBaseSink</code> in your application, you first need to add the <code>gearpump-external-hbase</code> library dependency in your application:</p>
<div class="codetabs">
<div data-lang="SBT">
<div class="highlight"><pre><code>"com.github.intel-hadoop" %% "gearpump-external-hbase" % 0.6.2
</code></pre></div>
</div>
<div data-lang="Maven">
<div class="highlight"><pre><code>&lt;dependency&gt;
&lt;groupId&gt;com.github.intel-hadoop&lt;/groupId&gt;
&lt;artifactId&gt;gearpump-external-hbase&lt;/artifactId&gt;
&lt;version&gt;0.6.2&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div>
</div>
</div>
<p>To connect to HBase, you need to provide following info:
- the HBase configuration to tell which HBase service to connect
- the table name</p>
<p>Then, you can use <code>HBaseSink</code> in your application:</p>
<div class="codetabs">
<div data-lang="Low Level API">
<div class="highlight"><pre><code class="language-scala"> <span class="c1">//create the HBase data sink</span>
<span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="nc">HBaseSink</span><span class="o">(</span><span class="n">tableName</span><span class="o">,</span> <span class="n">configuration</span><span class="o">)</span>
<span class="c1">//create Gearpump Processor</span>
<span class="k">val</span> <span class="n">sinkProcessor</span> <span class="k">=</span> <span class="nc">DataSinkProcessor</span><span class="o">(</span><span class="n">source</span><span class="o">,</span> <span class="n">parallelism</span><span class="o">)</span></code></pre></div>
</div>
<div data-lang="High Level DSL">
<div class="highlight"><pre><code class="language-scala"> <span class="c1">//assume stream is a normal `Stream` in DSL</span>
<span class="n">stream</span><span class="o">.</span><span class="n">writeToHbase</span><span class="o">(</span><span class="n">tableName</span><span class="o">,</span> <span class="n">parallelism</span><span class="o">,</span> <span class="s">&quot;write to HBase&quot;</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p>You can tune the connection to HBase via the HBase configuration passed in. If not passed, Gearpump will try to check local classpath to find a valid HBase configuration (<code>hbase-site.xml</code>).</p>
<h2 id="how-to-implement-your-own-datasource">How to implement your own <code>DataSource</code></h2>
<p>To implement your own <code>DataSource</code>, you need to implement two things:</p>
<ol>
<li>The data source itself</li>
<li>a helper class to make it easy use in DSL</li>
</ol>
<h3 id="implement-your-own-datasource">Implement your own <code>DataSource</code></h3>
<p>You need to implement a class derived from <code>io.gearpump.streaming.transaction.api.TimeReplayableSource</code>.</p>
<h3 id="implement-dsl-helper-optional">Implement DSL helper (Optional)</h3>
<p>To make DSL easy of use this customized stream, it is better that if you can implement your own DSL helper.
You can refer <code>KafkaDSLUtil</code> as an example in Gearpump source.</p>
<p>Below is some code snippet from <code>KafkaDSLUtil</code>:</p>
<div class="highlight"><pre><code class="language-scala"><span class="k">object</span> <span class="nc">KafkaDSLUtil</span> <span class="o">{</span>
<span class="c1">//T is the message type</span>
<span class="k">def</span> <span class="n">createStream</span><span class="o">[</span><span class="kt">T:</span> <span class="kt">ClassTag</span><span class="o">](</span>
<span class="n">app</span><span class="k">:</span> <span class="kt">StreamApp</span><span class="o">,</span>
<span class="n">parallelism</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span>
<span class="n">description</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span>
<span class="n">topics</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span>
<span class="n">zkConnect</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span>
<span class="n">offsetStorageFactory</span><span class="k">:</span> <span class="kt">OffsetStorageFactory</span><span class="o">)</span><span class="k">:</span> <span class="kt">dsl.Stream</span><span class="o">[</span><span class="kt">T</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span>
<span class="n">app</span><span class="o">.</span><span class="n">source</span><span class="o">[</span><span class="kt">T</span><span class="o">](</span><span class="k">new</span> <span class="nc">KafkaSource</span><span class="o">(</span><span class="n">topics</span><span class="o">,</span> <span class="n">zkConnect</span><span class="o">,</span> <span class="n">offsetStorageFactory</span><span class="o">)</span>
<span class="k">with</span> <span class="nc">TypedDataSource</span><span class="o">[</span><span class="kt">T</span><span class="o">],</span> <span class="n">parallelism</span><span class="o">,</span> <span class="n">description</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<h2 id="how-to-implement-your-own-datasink">How to implement your own <code>DataSink</code></h2>
<p>To implement your own <code>DataSink</code>, you need to implement two things:</p>
<ol>
<li>The data sink itself</li>
<li>a helper class to make it easy use in DSL</li>
</ol>
<h3 id="implement-your-own-datasink">Implement your own <code>DataSink</code></h3>
<p>You need to implement a class derived from <code>io.gearpump.streaming.sink.DataSink</code>.</p>
<h3 id="implement-dsl-helper-optional-1">Implement DSL helper (Optional)</h3>
<p>To make DSL easy of use this customized stream, it is better that if you can implement your own DSL helper.
You can refer <code>HBaseDSLSink</code> as an example in Gearpump source.</p>
<p>Below is some code snippet from <code>KafkaDSLUtil</code>:</p>
<div class="highlight"><pre><code class="language-scala"><span class="k">class</span> <span class="nc">HBaseDSLSink</span><span class="o">[</span><span class="kt">T:</span> <span class="kt">ClassTag</span><span class="o">](</span><span class="n">stream</span><span class="k">:</span> <span class="kt">Stream</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">writeToHbase</span><span class="o">(</span><span class="n">table</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">parallism</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">description</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">Stream</span><span class="o">[</span><span class="kt">T</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span>
<span class="n">stream</span><span class="o">.</span><span class="n">sink</span><span class="o">(</span><span class="nc">HBaseSink</span><span class="o">(</span><span class="n">table</span><span class="o">),</span> <span class="n">parallism</span><span class="o">,</span> <span class="n">description</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="k">object</span> <span class="nc">HBaseDSLSink</span> <span class="o">{</span>
<span class="k">implicit</span> <span class="k">def</span> <span class="n">streamToHBaseDSLSink</span><span class="o">[</span><span class="kt">T:</span> <span class="kt">ClassTag</span><span class="o">](</span><span class="n">stream</span><span class="k">:</span> <span class="kt">Stream</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span><span class="k">:</span> <span class="kt">HBaseDSLSink</span><span class="o">[</span><span class="kt">T</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">new</span> <span class="nc">HBaseDSLSink</span><span class="o">[</span><span class="kt">T</span><span class="o">](</span><span class="n">stream</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div> <!-- /container -->
<script src="js/vendor/jquery-2.1.4.min.js"></script>
<script src="js/vendor/bootstrap-3.3.5.min.js"></script>
<script src="js/vendor/anchor-1.1.1.min.js"></script>
<script src="js/main.js"></script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>