| --- |
| layout: post |
| status: PUBLISHED |
| published: true |
| title: Streaming data into Apache HBase using Apache Flume |
| id: 1eed3e39-b7f4-4045-8061-813a06dfb2b3 |
| date: '2012-11-27 21:39:20 -0500' |
| categories: flume |
| tags: |
| - flume |
| - hbase |
| permalink: flume/entry/streaming_data_into_apache_hbase |
| --- |
| <p><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><font face="georgia, times new roman, times, serif"><font style="font-size: medium;"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">Apache Flume was conceived as a fault-tolerant ingest system for the Apache Hadoop ecosystem. Flume comes packaged with an HDFS Sink which can be used to write events into HDFS, and two different implementations of HBase sinks to write events into HBase. You can read about the basic architecture of Apache Flume 1.x </span><a href="https://blogs.apache.org/flume/entry/flume_ng_architecture"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">in this blog post</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">. </span></font></font><strong id="internal-source-marker_0.83681909320876" style="font-size: medium; font-weight: normal;"><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> You can also read about how Flume’s File Channel persists events and still provides extremely high performance in an </span><a href="https://blogs.apache.org/flume/entry/apache_flume_filechannel"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">earlier blog post</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">. </span></font></strong><font face="georgia, times new roman, times, serif"><font style="font-size: medium;"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">In this article, we will explore how to configure Flume to write events into HBase, and write custom serializers to write events into HBase in a format of the user’s choice. </span><br /><span style="font-size: 19px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">Data is stored in HBase as tables. Each table has one or more column families, and each column family has one or more columns. HBase stores all columns in a column family in physical proximity. Each row is identified by a key known as the row key. To insert data into HBase, the table name, column family, column name and row key have to be specified. More details on the HBase data model can be found </span><a href="http://hbase.apache.org/book.html#datamodel"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">in the HBase documentation</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">Flume has two HBase Sinks, the HBaseSink(org.apache.flume.sink.hbase.HBaseSink) and AsyncHBaseSink(org.apache.flume.sink.hbase.AsyncHBaseSink). These two sinks will eventually converge to similar functionality, but currently each has some advantages over the other:</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /></font> </font> </p> |
| <ul style="font-size: medium; margin-top: 0pt; margin-bottom: 0pt;"> |
| <li dir="ltr" style="list-style-type: disc; font-size: 15px; background-color: transparent; vertical-align: baseline;"><span style="background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia, times new roman, times, serif">The AsyncHBaseSink currently gives better performance than the HBaseSink, primarily because it makes non-blocking calls to HBase.</font></span></li> |
| <li dir="ltr" style="list-style-type: disc; font-size: 15px; background-color: transparent; vertical-align: baseline;"><font face="georgia, times new roman, times, serif"><span style="background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The HBaseSink will soon support secure HBase clusters (<a href="https://issues.apache.org/jira/browse/FLUME-1626">FLUME-1626</a>) and the new HBase IPC which was introduced in HBase 0.96.</span></font></li> |
| </ul> |
| <p style="font-size: medium;"><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The configuration for both these sinks are very similar. A sample configuration is shown below:</span></font> </p> |
| <pre><font size="3"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#Use the AsyncHBaseSink
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#Use the HBaseSink
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.channel = ch1
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.table = transactions
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.columnFamily = clients
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.column = charges
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.batchSize = 5000
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#Use the SimpleHbaseEventSerializer that comes with Flume
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.serializer.incrementColumn = icol
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.channels.ch1.type=memory</span></font></pre> |
| <p><font face="georgia, times new roman, times, serif"><font><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">In the above config, the “table” parameter specifies the table in HBase that the sink has to write to - in this case, “transactions”; the “columnFamily” parameter specifies the column family in that table to insert the data into, in this case, “clients”; and the “column” parameter specifies the column in the column family to write to, in this case “charges”. Apart from this the sink requires the channel to be configured, like all other Flume Sinks. The other interesting configuration parameters are the “serializer” and the “serializer.*” parameters. The two sinks use different interfaces for the serializer. In both cases, the serializer is a class that converts the Flume Event into an HBase-friendly format. This piece of code that “translates” the events is usually specific to the schema used by the user’s HBase cluster and is usually implemented by the user. All configuration parameters passed in as “serializer.*” are passed to the serializer. This configuration can be used to set up any internal state the serializer needs.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">In case of the HBaseSink, the serializer converts a Flume </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/Event.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">Event</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> into one or more HBase </span><a href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Put.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">Put</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">s and/or </span><a href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Increment.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">Increment</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">s. The serializer must implement the </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">HbaseEventSerializer</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">. The serializer is instantiated when the sink is started by the Flume configuration framework. For each event processed by the sink, the sink calls the </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html#initialize(org.apache.flume.Event,%20byte[])" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">initialize</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> method in the serializer. The serializer must “translate” the Flume Event into HBase puts and increments which should be returned by </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html#getActions()" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">getActions</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> and </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html#getIncrements()" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">getIncrements</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> methods. These puts and increments are then sent over the wire to the HBase cluster. When the sink stops, this instance of the serializer is </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html#close()" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">closed </span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">by the HBaseSink.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The AsyncHBaseSink’s serializer must implement </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">AsyncHbaseEventSerializer</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">. </span></font> </p> |
| <p style="font-size: medium; display: inline !important;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">In this case, the </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#initialize(byte[],%20byte[])"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">initialize</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> method is called once by the sink, when it starts up. For every event, the sink calls the <a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#setEvent(org.apache.flume.Event)">setEvent</a> method and then calls the </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#getActions()"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">getActions</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> and </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#getIncrements()"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">getIncrements</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> methods - similar to the HBaseSink. When the sink is stopped, the serializer’s </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#cleanUp()"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">cleanUp</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> method is called. Notice that the methods do not return the standard HBase Puts and Increments, but </span><a href="http://tsunanet.net/~tsuna/asynchbase/api/org/hbase/async/PutRequest.html"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">PutRequest</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> and </span><a href="http://tsunanet.net/~tsuna/asynchbase/api/org/hbase/async/AtomicIncrementRequest.html"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">AtomicIncrementRequest</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> from the </span><a href="http://tsunanet.net/~tsuna/asynchbase/api/"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">asynchbase</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> API. These are roughly equivalent to the HBase Puts and Increments respectively, with some differences. </span></strong></strong></p> |
| <p><strong id="internal-source-marker_0.83681909320876" style="font-size: medium; font-weight: normal;"> </p> |
| <p style="display: inline !important;"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">An example of such a serializer is below. </span></p> |
| <p></strong><br /> </font> </p> |
| <pre><font size="3"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">/**
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> * A serializer for the AsyncHBaseSink, which splits the event body into
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> * multiple columns and inserts them into a row whose key is available in
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> * the headers
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> */
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">public class SplittingSerializer implements AsyncHbaseEventSerializer {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> private byte[] table;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> private byte[] colFam;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> private Event currentEvent;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> private byte[][] columnNames;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> private final List<PutRequest> puts = new ArrayList<PutRequest>();
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> private byte[] currentRowKey;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> private final byte[] eventCountCol = "eventCount".getBytes();</span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> @Override
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> public void initialize(byte[] table, byte[] cf) {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> this.table = table;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> this.colFam = cf;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }</span>
|
|
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> @Override
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> public void setEvent(Event event) {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> // Set the event and verify that the rowKey is not present
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> this.currentEvent = event;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> String rowKeyStr = currentEvent.getHeaders().get("rowKey");
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> if (rowKeyStr == null) {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> throw new FlumeException("No row key found in headers!");
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> currentRowKey = rowKeyStr.getBytes();
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }</span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> @Override
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> public List<PutRequest> getActions() {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> // Split the event body and get the values for the columns
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> String eventStr = new String(currentEvent.getBody());
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> String[] cols = eventStr.split(",");
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> puts.clear();
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> for (int i = 0; i < cols.length; i++) {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> //Generate a PutRequest for each column.
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> PutRequest req = new PutRequest(table, currentRowKey, colFam,
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> columnNames[i], cols[i].getBytes());
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> puts.add(req);
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> return puts;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }</span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> @Override
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> public List<AtomicIncrementRequest> getIncrements() {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> incs.clear();
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> //Increment the number of events received
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> return incs;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }</span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> @Override
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> public void cleanUp() {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> table = null;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> colFam = null;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> currentEvent = null;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> columnNames = null;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> currentRowKey = null;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }</span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> @Override
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> public void configure(Context context) {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> //Get the column names from the configuration
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> String cols = new String(context.getString("columns"));
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> String[] names = cols.split(",");
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> byte[][] columnNames = new byte[names.length][];
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> int i = 0;
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> for(String name : names) {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> columnNames[i++] = name.getBytes();
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }</span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> @Override
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> public void configure(ComponentConfiguration conf) {
|
| </span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> }</span>
|
| <span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">}</span></font></pre> |
| <p style="font-size: medium;"><span style="font-size: 15px; font-family: Arial; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"> </strong></p> |
| <p style="font-size: medium; display: inline !important;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"></strong></strong></strong></p> |
| <p><strong id="internal-source-marker_0.83681909320876" style="font-size: medium; font-weight: normal;"> </p> |
| <p style="display: inline !important;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">This serializer splits the event body based on a delimiter and inserts each split into a different column. The row is defined in the event header. When each event is received, a counter is incremented to keep track of the number of events received as well.</span></font></strong></strong></p> |
| <p><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">This serializer can be configured by the following configuration:</span></font> </p> |
| <p></strong> </p> |
| <p style="font-size: medium;"> |
| <p></strong> </p> |
| <pre><font size="3"><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
|
| </span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.channel = ch1
|
| </span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.table = transactions
|
| </span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.columnFamily = clients
|
| </span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.batchSize = 5000</span></b></strong></font>
|
| <font size="3"><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#The serializer to use
|
| </span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SplittingSerializer</span></b></strong></font>
|
| <font size="3"><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#List of columns each event writes to.
|
| </span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.serializer.columns = charges,date,priority</span></b></strong></font></pre> |
| <p><strong id="internal-source-marker_0.83681909320876" style="font-size: medium; font-weight: normal;"><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; font-weight: bold; vertical-align: baseline; white-space: pre-wrap;">Internals of the HBaseSink and AsyncHBaseSink</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The HBaseSink uses the HBase HTable API to write events out to HBase. HTable supports batching of Puts, but only HBase 0.92+ supports batching of Increments. Currently, the HBase Sink is single-threaded and will call the serializer to get the Puts and Increments once per event it processes. HBase Put and Increments are sent to HBase via blocking calls, which means the next event is read and passed to the serializer only once the current event is successfully written to HBase. Each transaction consists of at most the number of events specified by the batchSize property in the configuration. Like all other Flume sinks, if one of these events fails to get written successfully, the sink will retry the entire transaction again.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">On the other hand, the AsyncHBaseSink uses the asynchbase API, and sends out events asynchronously to HBase. The AsyncHBaseSink, in the same way as the HBase sink, generates Puts and Increments for each event. Once the Puts and Increments are generated, the sink sends them out immediately to HBase and moves on to process the next event. Success or failure is handled through callbacks. Again, each transaction consists of at most the number of events specified by the batchSize configuration parameter. The sink waits until either success callbacks are received for all the events sent, or at least one error callback is received. If an error callback is received, the entire transaction is retried, in true Flume style. </span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; font-weight: bold; vertical-align: baseline; white-space: pre-wrap;">A word of caution</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">As you can see, if HBase reports failure to write even one Put or Increment, the entire transaction is retried - this is how Flume’s at-least-once semantics work, and most Flume sinks operate in the same way. In case of HBase Increments, this means it is possible that the same event would cause a counter to be incremented more than once. This is something to keep in mind while using Flume to perform Increments. Also, if the serializer is not idempotent, then this means that it is possible that the same event can cause multiple different Puts to be written to HBase. Imagine a case where we are talking about credit card transactions represented by the event. If the same event can generate different Puts each time, it is possible that HBase would have multiple records of the same transactions, which is probably not desired.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The AsyncHBaseSink is known to give better performance than the HBaseSink primarily because of the non-blocking nature of the underlying API it uses. The HBase community is working on improving the HBase client API to improve its performance, which would vastly improve the HBaseSink performance.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; font-weight: bold; vertical-align: baseline; white-space: pre-wrap;">Conclusion</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">Flume is an excellent tool to write events out to the different storage systems in the Hadoop ecosystem including HBase. The HBase sinks provide the functionality to write data to HBase in your own schema and allows the user to “map” the Flume event to HBase data.</span></font><br /></strong> </p> |