| <!DOCTYPE html> |
| <!--[if lt IE 7]> |
| <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> |
| <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> |
| <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> |
| <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=1"/> |
| <title>Gearpump Connectors - Gearpump 0.8.1 Documentation</title> |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap-3.3.5.min.css"> |
| <style> |
| body { |
| padding-top: 60px; |
| padding-bottom: 40px; |
| } |
| </style> |
| <link rel="stylesheet" href="css/main.css"> |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| </head> |
| <body> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade |
| your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install |
| Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <div class="navbar navbar-inverse navbar-fixed-top" id="topbar"> |
| <div class="container"> |
| <div class="navbar-header"> |
| <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" |
| data-target="#navbar" aria-expanded="false" aria-controls="navbar"> |
| <span class="sr-only">Toggle navigation</span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <a class="navbar-brand" href="http://gearpump.apache.org">Gearpump |
| <span class="label label-primary" style="font-size: .6em">0.8.1</span> |
| </a> |
| </div> |
| <div id="navbar" class="collapse navbar-collapse"> |
| <ul class="nav navbar-nav"> |
| <li><a href="index.html">Overview</a></li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Introduction<b |
| class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="submit-your-1st-application.html">Submit Your 1st Application</a></li> |
| <li><a href="commandline.html">Client Command Line</a></li> |
| <li class="divider"></li> |
| <li><a href="basic-concepts.html">Basic Concepts</a></li> |
| <li><a href="features.html">Technical Highlights</a></li> |
| <li><a href="message-delivery.html">Reliable Message Delivery</a></li> |
| <li><a href="performance-report.html">Performance</a></li> |
| <li><a href="gearpump-internals.html">Gearpump Internals</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li class="dropdown-header">Deployment</li> |
| <li><a href="deployment-local.html">Local Mode</a> |
| <li> |
| <li><a href="deployment-standalone.html">Standalone Mode</a></li> |
| <li><a href="deployment-yarn.html">YARN Mode</a></li> |
| <li><a href="deployment-docker.html">Docker Mode</a> |
| <li> |
| <li class="divider"></li> |
| <li><a href="deployment-ui-authentication.html">UI Authentication</a></li> |
| <li><a href="deployment-ha.html">High Availability</a></li> |
| <li><a href="deployment-msg-delivery.html">Reliable Message Delivery</a></li> |
| <li><a href="deployment-configuration.html">Configuration</a></li> |
| <li><a href="deployment-resource-isolation.html">Resource Isolation</a></li> |
| <li class="divider"></li> |
| <li><a href="deployment-security.html">YARN Security Guide</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guide<b |
| class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="dev-write-1st-app.html">Write Your 1st App</a></li> |
| <li><a href="dev-custom-serializer.html">Customized Message Passing</a></li> |
| <li class="divider"></li> |
| <li><a href="api/scala/index.html">Scala API</a></li> |
| <li><a href="api/java/index.html">Java API</a></li> |
| <li><a href="dev-rest-api.html">RESTful API</a></li> |
| <li class="divider"></li> |
| <li><a href="dev-connectors.html">Gearpump Connectors</a></li> |
| <li class="divider"></li> |
| <li><a href="dev-storm.html">Storm Compatibility</a></li> |
| <!-- |
| <li><a href="dev-samoa.html">Samoa Compatibility</a></li> |
| <li class="divider"></li> |
| <li><a href="dev-iot.html">Gearpump with IoT</a></li> |
| --> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="faq.html">FAQ</a></li> |
| </ul> |
| </li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| |
| <div class="container" id="content"> |
| |
| <h1 class="title">Gearpump Connectors</h1> |
| |
| |
| <h2 id="basic-concepts">Basic Concepts</h2> |
| <p><code>DataSource</code> and <code>DataSink</code> are the two main concepts Gearpump use to connect with the outside world.</p> |
| |
| <h3 id="datasource">DataSource</h3> |
| <p><code>DataSource</code> is the start point of a streaming processing flow.</p> |
| |
| <h3 id="datasink">DataSink</h3> |
| <p><code>DataSink</code> is the end point of a streaming processing flow.</p> |
| |
| <h2 id="implemented-connectors">Implemented Connectors</h2> |
| |
| <h3 id="datasource-implemented"><code>DataSource</code> implemented</h3> |
| <p>Currently, we have following <code>DataSource</code> supported.</p> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Name</th> |
| <th>Description</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><code>CollectionDataSource</code></td> |
| <td>Convert a collection to a recursive data source. E.g. <code>seq(1, 2, 3)</code> will output <code>1,2,3,1,2,3...</code>.</td> |
| </tr> |
| <tr> |
| <td><code>KafkaSource</code></td> |
| <td>Read from Kafka.</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <h3 id="datasink-implemented"><code>DataSink</code> implemented</h3> |
| <p>Currently, we have following <code>DataSink</code> supported.</p> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Name</th> |
| <th>Description</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><code>HBaseSink</code></td> |
| <td>Write the message to HBase. The message to write must be HBase <code>Put</code> or a tuple of <code>(rowKey, family, column, value)</code>.</td> |
| </tr> |
| <tr> |
| <td><code>KafkaSink</code></td> |
| <td>Write to Kafka.</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <h2 id="use-of-connectors">Use of Connectors</h2> |
| |
| <h3 id="use-of-kafka-connectors">Use of Kafka connectors</h3> |
| |
| <p>To use Kafka connectors in your application, you first need to add the <code>gearpump-external-kafka</code> library dependency in your application:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="sbt" data-label="sbt"> |
| <div class="highlight"><pre><code>"org.apache.gearpump" %% "gearpump-external-kafka" % 0.8.1 |
| </code></pre></div> |
| </div> |
| |
| <div data-lang="xml" data-label="xml"> |
| <div class="highlight"><pre><code class="language-xml"><span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.gearpump<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>gearpump-external-kafka<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>0.8.1<span class="nt"></version></span> |
| <span class="nt"></dependency></span></code></pre></div> |
| </div> |
| </div> |
| |
| <p>This is a simple example to read from Kafka and write it back using <code>KafkaSource</code> and <code>KafkaSink</code>. Users can optionally set a <code>CheckpointStoreFactory</code> such that Kafka offsets are checkpointed and at-least-once message delivery is guaranteed.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala" data-label="Processor API"> |
| <div class="highlight"><pre><code class="language-scala"> <span class="k">val</span> <span class="n">appConfig</span> <span class="k">=</span> <span class="nc">UserConfig</span><span class="o">.</span><span class="n">empty</span> |
| <span class="k">val</span> <span class="n">props</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Properties</span> |
| <span class="n">props</span><span class="o">.</span><span class="n">put</span><span class="o">(</span><span class="nc">KafkaConfig</span><span class="o">.</span><span class="nc">ZOOKEEPER_CONNECT_CONFIG</span><span class="o">,</span> <span class="n">zookeeperConnect</span><span class="o">)</span> |
| <span class="n">props</span><span class="o">.</span><span class="n">put</span><span class="o">(</span><span class="nc">KafkaConfig</span><span class="o">.</span><span class="nc">BOOTSTRAP_SERVERS_CONFIG</span><span class="o">,</span> <span class="n">brokerList</span><span class="o">)</span> |
| <span class="n">props</span><span class="o">.</span><span class="n">put</span><span class="o">(</span><span class="nc">KafkaConfig</span><span class="o">.</span><span class="nc">CHECKPOINT_STORE_NAME_PREFIX_CONFIG</span><span class="o">,</span> <span class="n">appName</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">source</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">KafkaSource</span><span class="o">(</span><span class="n">sourceTopic</span><span class="o">,</span> <span class="n">props</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">checkpointStoreFactory</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">KafkaStoreFactory</span><span class="o">(</span><span class="n">props</span><span class="o">)</span> |
| <span class="n">source</span><span class="o">.</span><span class="n">setCheckpointStore</span><span class="o">(</span><span class="n">checkpointStoreFactory</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">sourceProcessor</span> <span class="k">=</span> <span class="nc">DataSourceProcessor</span><span class="o">(</span><span class="n">source</span><span class="o">,</span> <span class="n">sourceNum</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">KafkaSink</span><span class="o">(</span><span class="n">sinkTopic</span><span class="o">,</span> <span class="n">props</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">sinkProcessor</span> <span class="k">=</span> <span class="nc">DataSinkProcessor</span><span class="o">(</span><span class="n">sink</span><span class="o">,</span> <span class="n">sinkNum</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">partitioner</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">ShufflePartitioner</span> |
| <span class="k">val</span> <span class="n">computation</span> <span class="k">=</span> <span class="n">sourceProcessor</span> <span class="o">~</span> <span class="n">partitioner</span> <span class="o">~></span> <span class="n">sinkProcessor</span> |
| <span class="k">val</span> <span class="n">app</span> <span class="k">=</span> <span class="nc">StreamApplication</span><span class="o">(</span><span class="n">appName</span><span class="o">,</span> <span class="nc">Graph</span><span class="o">(</span><span class="n">computation</span><span class="o">),</span> <span class="n">appConfig</span><span class="o">)</span></code></pre></div> |
| </div> |
| |
| <div data-lang="scala" data-label="Stream DSL"> |
| <div class="highlight"><pre><code class="language-scala"> <span class="k">val</span> <span class="n">props</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Properties</span> |
| <span class="k">val</span> <span class="n">appName</span> <span class="k">=</span> <span class="s">"KafkaDSL"</span> |
| <span class="n">props</span><span class="o">.</span><span class="n">put</span><span class="o">(</span><span class="nc">KafkaConfig</span><span class="o">.</span><span class="nc">ZOOKEEPER_CONNECT_CONFIG</span><span class="o">,</span> <span class="n">zookeeperConnect</span><span class="o">)</span> |
| <span class="n">props</span><span class="o">.</span><span class="n">put</span><span class="o">(</span><span class="nc">KafkaConfig</span><span class="o">.</span><span class="nc">BOOTSTRAP_SERVERS_CONFIG</span><span class="o">,</span> <span class="n">brokerList</span><span class="o">)</span> |
| <span class="n">props</span><span class="o">.</span><span class="n">put</span><span class="o">(</span><span class="nc">KafkaConfig</span><span class="o">.</span><span class="nc">CHECKPOINT_STORE_NAME_PREFIX_CONFIG</span><span class="o">,</span> <span class="n">appName</span><span class="o">)</span> |
| |
| <span class="k">val</span> <span class="n">app</span> <span class="k">=</span> <span class="nc">StreamApp</span><span class="o">(</span><span class="n">appName</span><span class="o">,</span> <span class="n">context</span><span class="o">)</span> |
| |
| <span class="k">if</span> <span class="o">(</span><span class="n">atLeastOnce</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="n">checkpointStoreFactory</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">KafkaStoreFactory</span><span class="o">(</span><span class="n">props</span><span class="o">)</span> |
| <span class="nc">KafkaDSL</span><span class="o">.</span><span class="n">createAtLeastOnceStream</span><span class="o">(</span><span class="n">app</span><span class="o">,</span> <span class="n">sourceTopic</span><span class="o">,</span> <span class="n">checkpointStoreFactory</span><span class="o">,</span> <span class="n">props</span><span class="o">,</span> <span class="n">sourceNum</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">writeToKafka</span><span class="o">(</span><span class="n">sinkTopic</span><span class="o">,</span> <span class="n">props</span><span class="o">,</span> <span class="n">sinkNum</span><span class="o">)</span> |
| <span class="o">}</span> <span class="k">else</span> <span class="o">{</span> |
| <span class="nc">KafkaDSL</span><span class="o">.</span><span class="n">createAtMostOnceStream</span><span class="o">(</span><span class="n">app</span><span class="o">,</span> <span class="n">sourceTopic</span><span class="o">,</span> <span class="n">props</span><span class="o">,</span> <span class="n">sourceNum</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">writeToKafka</span><span class="o">(</span><span class="n">sinkTopic</span><span class="o">,</span> <span class="n">props</span><span class="o">,</span> <span class="n">sinkNum</span><span class="o">)</span> |
| <span class="o">}</span></code></pre></div> |
| </div> |
| </div> |
| |
| <p>In the above example, configurations are set through Java properties and shared by <code>KafkaSource</code>, <code>KafkaSink</code> and <code>KafkaCheckpointStoreFactory</code>. |
| Their configurations can be defined differently as below.</p> |
| |
| <h4 id="kafkasource-configurations"><code>KafkaSource</code> configurations</h4> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Name</th> |
| <th>Descriptions</th> |
| <th>Type</th> |
| <th>Default</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><code>KafkaConfig.ZOOKEEPER_CONNECT_CONFIG</code></td> |
| <td>Zookeeper connect string for Kafka topics management</td> |
| <td>String</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.CLIENT_ID_CONFIG</code></td> |
| <td>An id string to pass to the server when making requests</td> |
| <td>String</td> |
| <td>””</td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.GROUP_ID_CONFIG</code></td> |
| <td>A string that uniquely identifies a set of consumers within the same consumer group</td> |
| <td>””</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.FETCH_SLEEP_MS_CONFIG</code></td> |
| <td>The amount of time(ms) to sleep when hitting fetch.threshold</td> |
| <td>Int</td> |
| <td>100</td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.FETCH_THRESHOLD_CONFIG</code></td> |
| <td>Size of internal queue to keep Kafka messages. Stop fetching and go to sleep when hitting the threshold</td> |
| <td>Int</td> |
| <td>10000</td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG</code></td> |
| <td>Partition grouper class to group partitions amoung source tasks</td> |
| <td>Class</td> |
| <td>DefaultPartitionGrouper</td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG</code></td> |
| <td>Message decoder class to decode raw bytes from Kafka</td> |
| <td>Class</td> |
| <td>DefaultMessageDecoder</td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG</code></td> |
| <td>Timestamp filter class to filter out late messages</td> |
| <td>Class</td> |
| <td>DefaultTimeStampFilter</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <h4 id="kafkasink-configurations"><code>KafkaSink</code> configurations</h4> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Name</th> |
| <th>Descriptions</th> |
| <th>Type</th> |
| <th>Default</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><code>KafkaConfig.BOOTSTRAP_SERVERS_CONFIG</code></td> |
| <td>A list of host/port pairs to use for establishing the initial connection to the Kafka cluster</td> |
| <td>String</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.CLIENT_ID_CONFIG</code></td> |
| <td>An id string to pass to the server when making requests</td> |
| <td>String</td> |
| <td>””</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <h4 id="kafkacheckpointstorefactory-configurations"><code>KafkaCheckpointStoreFactory</code> configurations</h4> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Name</th> |
| <th>Descriptions</th> |
| <th>Type</th> |
| <th>Default</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><code>KafkaConfig.ZOOKEEPER_CONNECT_CONFIG</code></td> |
| <td>Zookeeper connect string for Kafka topics management</td> |
| <td>String</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.BOOTSTRAP_SERVERS_CONFIG</code></td> |
| <td>A list of host/port pairs to use for establishing the initial connection to the Kafka cluster</td> |
| <td>String</td> |
| <td> </td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX</code></td> |
| <td>Name prefix for checkpoint store</td> |
| <td>String</td> |
| <td>””</td> |
| </tr> |
| <tr> |
| <td><code>KafkaConfig.REPLICATION_FACTOR</code></td> |
| <td>Replication factor for checkpoint store topic</td> |
| <td>Int</td> |
| <td>1</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <h3 id="use-of-hbasesink">Use of <code>HBaseSink</code></h3> |
| |
| <p>To use <code>HBaseSink</code> in your application, you first need to add the <code>gearpump-external-hbase</code> library dependency in your application:</p> |
| |
| <div class="highlight"><pre><code>"org.apache.gearpump" %% "gearpump-external-hbase" % 0.8.1 |
| </code></pre></div> |
| |
| <div class="highlight"><pre><code class="language-xml"><span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.gearpump<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>gearpump-external-hbase<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>0.8.1<span class="nt"></version></span> |
| <span class="nt"></dependency></span></code></pre></div> |
| |
| <p>To connect to HBase, you need to provide following info:</p> |
| |
| <ul> |
| <li>the HBase configuration to tell which HBase service to connect</li> |
| <li>the table name (you must create the table yourself, see the <a href="https://hbase.apache.org/book.html">HBase documentation</a>)</li> |
| </ul> |
| |
| <p>Then, you can use <code>HBaseSink</code> in your application:</p> |
| |
| <div class="highlight"><pre><code class="language-scala"> <span class="c1">//create the HBase data sink</span> |
| <span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="nc">HBaseSink</span><span class="o">(</span><span class="nc">UserConfig</span><span class="o">.</span><span class="n">empty</span><span class="o">,</span> <span class="n">tableName</span><span class="o">,</span> <span class="nc">HBaseConfiguration</span><span class="o">.</span><span class="n">create</span><span class="o">())</span> |
| |
| <span class="c1">//create Gearpump Processor</span> |
| <span class="k">val</span> <span class="n">sinkProcessor</span> <span class="k">=</span> <span class="nc">DataSinkProcessor</span><span class="o">(</span><span class="n">sink</span><span class="o">,</span> <span class="n">parallelism</span><span class="o">)</span></code></pre></div> |
| |
| <div class="highlight"><pre><code class="language-scala"> <span class="c1">//assume stream is a normal `Stream` in DSL</span> |
| <span class="n">stream</span><span class="o">.</span><span class="n">writeToHbase</span><span class="o">(</span><span class="nc">UserConfig</span><span class="o">.</span><span class="n">empty</span><span class="o">,</span> <span class="n">tableName</span><span class="o">,</span> <span class="n">parallelism</span><span class="o">,</span> <span class="s">"write to HBase"</span><span class="o">)</span></code></pre></div> |
| |
| <p>You can tune the connection to HBase via the HBase configuration passed in. If not passed, Gearpump will try to check local classpath to find a valid HBase configuration (<code>hbase-site.xml</code>).</p> |
| |
| <p>Attention, due to the issue discussed <a href="http://stackoverflow.com/questions/24456484/hbase-managed-zookeeper-suddenly-trying-to-connect-to-localhost-instead-of-zooke">here</a> you may need to create additional configuration for your HBase sink:</p> |
| |
| <div class="highlight"><pre><code class="language-scala"> <span class="k">def</span> <span class="n">hadoopConfig</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="n">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Configuration</span><span class="o">()</span> |
| <span class="n">conf</span><span class="o">.</span><span class="n">set</span><span class="o">(</span><span class="s">"hbase.zookeeper.quorum"</span><span class="o">,</span> <span class="s">"zookeeperHost"</span><span class="o">)</span> |
| <span class="n">conf</span><span class="o">.</span><span class="n">set</span><span class="o">(</span><span class="s">"hbase.zookeeper.property.clientPort"</span><span class="o">,</span> <span class="s">"2181"</span><span class="o">)</span> |
| <span class="n">conf</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <div class="highlight"><pre><code class="language-scala"> <span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="nc">HBaseSink</span><span class="o">(</span><span class="nc">UserConfig</span><span class="o">.</span><span class="n">empty</span><span class="o">,</span> <span class="n">tableName</span><span class="o">,</span> <span class="n">hadoopConfig</span><span class="o">)</span></code></pre></div> |
| |
| <h2 id="how-to-implement-your-own-datasource">How to implement your own <code>DataSource</code></h2> |
| |
| <p>To implement your own <code>DataSource</code>, you need to implement two things:</p> |
| |
| <ol> |
| <li>The data source itself</li> |
| <li>a helper class to easy the usage in a DSL</li> |
| </ol> |
| |
| <h3 id="implement-your-own-datasource">Implement your own <code>DataSource</code></h3> |
| <p>You need to implement a class derived from <code>org.apache.gearpump.streaming.transaction.api.TimeReplayableSource</code>.</p> |
| |
| <h3 id="implement-dsl-helper-optional">Implement DSL helper (Optional)</h3> |
| <p>If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper. |
| You can refer <code>KafkaDSLUtil</code> as an example in Gearpump source.</p> |
| |
| <p>Below is some code snippet from <code>KafkaDSLUtil</code>:</p> |
| |
| <div class="highlight"><pre><code class="language-scala"><span class="k">object</span> <span class="nc">KafkaDSLUtil</span> <span class="o">{</span> |
| |
| <span class="k">def</span> <span class="n">createStream</span><span class="o">[</span><span class="kt">T</span><span class="o">](</span> |
| <span class="n">app</span><span class="k">:</span> <span class="kt">StreamApp</span><span class="o">,</span> |
| <span class="n">topics</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> |
| <span class="n">parallelism</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> |
| <span class="n">description</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> |
| <span class="n">properties</span><span class="k">:</span> <span class="kt">Properties</span><span class="o">)</span><span class="k">:</span> <span class="kt">dsl.Stream</span><span class="o">[</span><span class="kt">T</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="n">app</span><span class="o">.</span><span class="n">source</span><span class="o">[</span><span class="kt">T</span><span class="o">](</span><span class="k">new</span> <span class="nc">KafkaSource</span><span class="o">(</span><span class="n">topics</span><span class="o">,</span> <span class="n">properties</span><span class="o">),</span> <span class="n">parallelism</span><span class="o">,</span> <span class="n">description</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <h2 id="how-to-implement-your-own-datasink">How to implement your own <code>DataSink</code></h2> |
| <p>To implement your own <code>DataSink</code>, you need to implement two things:</p> |
| |
| <ol> |
| <li>The data sink itself</li> |
| <li>a helper class to make it easy use in DSL</li> |
| </ol> |
| |
| <h3 id="implement-your-own-datasink">Implement your own <code>DataSink</code></h3> |
| <p>You need to implement a class derived from <code>org.apache.gearpump.streaming.sink.DataSink</code>.</p> |
| |
| <h3 id="implement-dsl-helper-optional-1">Implement DSL helper (Optional)</h3> |
| <p>If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper. |
| You can refer <code>HBaseDSLSink</code> as an example in Gearpump source.</p> |
| |
| <p>Below is some code snippet from <code>HBaseDSLSink</code>:</p> |
| |
| <div class="highlight"><pre><code class="language-scala"><span class="k">class</span> <span class="nc">HBaseDSLSink</span><span class="o">[</span><span class="kt">T</span><span class="o">](</span><span class="n">stream</span><span class="k">:</span> <span class="kt">Stream</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span> <span class="o">{</span> |
| <span class="k">def</span> <span class="n">writeToHbase</span><span class="o">(</span><span class="n">userConfig</span><span class="k">:</span> <span class="kt">UserConfig</span><span class="o">,</span> <span class="n">table</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">parallism</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">description</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">Stream</span><span class="o">[</span><span class="kt">T</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="n">stream</span><span class="o">.</span><span class="n">sink</span><span class="o">(</span><span class="nc">HBaseSink</span><span class="o">[</span><span class="kt">T</span><span class="o">](</span><span class="n">userConfig</span><span class="o">,</span> <span class="n">table</span><span class="o">),</span> <span class="n">parallism</span><span class="o">,</span> <span class="n">userConfig</span><span class="o">,</span> <span class="n">description</span><span class="o">)</span> |
| <span class="o">}</span> |
| |
| <span class="k">def</span> <span class="n">writeToHbase</span><span class="o">(</span><span class="n">userConfig</span><span class="k">:</span> <span class="kt">UserConfig</span><span class="o">,</span> <span class="n">configuration</span><span class="k">:</span> <span class="kt">Configuration</span><span class="o">,</span> <span class="n">table</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">parallism</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">description</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">Stream</span><span class="o">[</span><span class="kt">T</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="n">stream</span><span class="o">.</span><span class="n">sink</span><span class="o">(</span><span class="nc">HBaseSink</span><span class="o">[</span><span class="kt">T</span><span class="o">](</span><span class="n">userConfig</span><span class="o">,</span> <span class="n">table</span><span class="o">,</span> <span class="n">configuration</span><span class="o">),</span> <span class="n">parallism</span><span class="o">,</span> <span class="n">userConfig</span><span class="o">,</span> <span class="n">description</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="k">object</span> <span class="nc">HBaseDSLSink</span> <span class="o">{</span> |
| <span class="k">implicit</span> <span class="k">def</span> <span class="n">streamToHBaseDSLSink</span><span class="o">[</span><span class="kt">T</span><span class="o">](</span><span class="n">stream</span><span class="k">:</span> <span class="kt">Stream</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span><span class="k">:</span> <span class="kt">HBaseDSLSink</span><span class="o">[</span><span class="kt">T</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="k">new</span> <span class="nc">HBaseDSLSink</span><span class="o">[</span><span class="kt">T</span><span class="o">](</span><span class="n">stream</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| |
| </div> |
| <!-- /container --> |
| |
| <script src="js/vendor/jquery-2.1.4.min.js"></script> |
| <script src="js/vendor/bootstrap-3.3.5.min.js"></script> |
| <script src="js/vendor/anchor-1.1.1.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function (d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function () { |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [["$", "$"], ["\\\\(", "\\\\)"]], |
| displayMath: [["$$", "$$"], ["\\[", "\\]"]], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |